| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.jackrabbit.oak.plugins.blob; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.collect.Lists.newArrayList; |
| import static java.io.File.createTempFile; |
| import static org.apache.commons.io.FileUtils.copyFile; |
| import static org.apache.jackrabbit.oak.commons.FileIOUtils.copy; |
| import static org.apache.jackrabbit.oak.commons.FileIOUtils.merge; |
| import static org.apache.jackrabbit.oak.commons.FileIOUtils.sort; |
| import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly; |
| |
| import java.io.BufferedWriter; |
| import java.io.ByteArrayInputStream; |
| import java.io.Closeable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.LineNumberReader; |
| import java.sql.Timestamp; |
| import java.util.ArrayDeque; |
| import java.util.Comparator; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import com.google.common.base.Charsets; |
| import com.google.common.base.Function; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.StandardSystemProperty; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.collect.FluentIterable; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableListMultimap; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.io.Closeables; |
| import com.google.common.io.Files; |
| import com.google.common.util.concurrent.ListenableFutureTask; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.io.LineIterator; |
| import org.apache.jackrabbit.core.data.DataRecord; |
| import org.apache.jackrabbit.core.data.DataStoreException; |
| import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; |
| import org.apache.jackrabbit.oak.commons.FileIOUtils; |
| import org.apache.jackrabbit.oak.commons.FileIOUtils.FileLineDifferenceIterator; |
| import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker; |
| import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobTracker; |
| import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; |
| import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; |
| import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; |
| import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; |
| import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; |
| import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils; |
| import org.apache.jackrabbit.oak.stats.CounterStats; |
| import org.apache.jackrabbit.oak.stats.StatisticsProvider; |
| import org.apache.jackrabbit.oak.stats.StatsOptions; |
| import org.apache.jackrabbit.oak.stats.TimerStats; |
| import org.jetbrains.annotations.Nullable; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Mark and sweep garbage collector. |
| * |
| * Uses the file system to store internal state while in process to account for huge data. |
| * This class is not thread safe. |
| * |
| */ |
| public class MarkSweepGarbageCollector implements BlobGarbageCollector { |
| |
| public static final Logger LOG = LoggerFactory.getLogger(MarkSweepGarbageCollector.class); |
| |
| public static final String TEMP_DIR = StandardSystemProperty.JAVA_IO_TMPDIR.value(); |
| |
| public static final int DEFAULT_BATCH_COUNT = 1024; |
| |
| public static final String DELIM = ","; |
| |
| private static final Function<String, String> transformer = new Function<String, String>() { |
| @Nullable |
| @Override |
| public String apply(@Nullable String input) { |
| if (input != null) { |
| return input.split(DELIM)[0]; |
| } |
| return ""; |
| }}; |
| |
| /** The last modified time before current time of blobs to consider for garbage collection. */ |
| private final long maxLastModifiedInterval; |
| |
| /** The blob store to be garbage collected. */ |
| private final GarbageCollectableBlobStore blobStore; |
| |
| /** Helper class to mark blob references which **/ |
| private final BlobReferenceRetriever marker; |
| |
| private final Executor executor; |
| |
| /** The batch count. */ |
| private final int batchCount; |
| |
| private final String repoId; |
| |
| private final String root; |
| |
| private final Whiteboard whiteboard; |
| |
| private CheckpointMBean checkpointMbean; |
| |
| /** Operation stats object **/ |
| private final GarbageCollectionOperationStats stats; |
| |
| private final OperationStatsCollector statsCollector; |
| |
| /** Operation consistency stats object **/ |
| private final GarbageCollectionOperationStats consistencyStats; |
| |
| private final OperationStatsCollector consistencyStatsCollector; |
| |
| private boolean traceOutput; |
| |
| /** |
| * Creates an instance of MarkSweepGarbageCollector |
| * |
| * @param marker BlobReferenceRetriever instanced used to fetch refereed blob entries |
| * @param blobStore the blob store instance |
| * @param executor executor |
| * @param root the root absolute path of directory under which temporary |
| * files would be created |
| * @param batchCount batch sized used for saving intermediate state |
| * @param maxLastModifiedInterval lastModifiedTime in millis. Only files with time |
| * less than this time would be considered for GC |
| * @param repositoryId unique repository id for this node |
| * @param whiteboard whiteboard instance |
| * @param statisticsProvider statistics provider instance |
| * @throws IOException |
| */ |
| public MarkSweepGarbageCollector( |
| BlobReferenceRetriever marker, |
| GarbageCollectableBlobStore blobStore, |
| Executor executor, |
| String root, |
| int batchCount, |
| long maxLastModifiedInterval, |
| @Nullable String repositoryId, |
| @Nullable Whiteboard whiteboard, |
| @Nullable StatisticsProvider statisticsProvider) |
| throws IOException { |
| this.executor = executor; |
| this.blobStore = blobStore; |
| checkNotNull(blobStore, "BlobStore cannot be null"); |
| this.marker = marker; |
| this.batchCount = batchCount; |
| this.maxLastModifiedInterval = maxLastModifiedInterval; |
| this.repoId = repositoryId; |
| this.root = root; |
| this.whiteboard = whiteboard; |
| if (whiteboard != null) { |
| this.checkpointMbean = WhiteboardUtils.getService(whiteboard, CheckpointMBean.class); |
| } |
| |
| // re-initialize the statsProvider if passed as parameter |
| if (statisticsProvider == null) { |
| statisticsProvider = StatisticsProvider.NOOP; |
| } |
| this.stats = new GarbageCollectionOperationStats(statisticsProvider); |
| this.statsCollector = stats.getCollector(); |
| this.consistencyStats = |
| new GarbageCollectionOperationStats(statisticsProvider, GarbageCollectionOperationStats.CONSISTENCY_NAME); |
| this.consistencyStatsCollector = consistencyStats.getCollector(); |
| } |
| |
| public MarkSweepGarbageCollector( |
| BlobReferenceRetriever marker, |
| GarbageCollectableBlobStore blobStore, |
| Executor executor, |
| String root, |
| int batchCount, |
| long maxLastModifiedInterval, |
| @Nullable String repositoryId) |
| throws IOException { |
| this(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, repositoryId, null, null); |
| } |
| |
| /** |
| * Instantiates a new blob garbage collector. |
| */ |
| public MarkSweepGarbageCollector( |
| BlobReferenceRetriever marker, |
| GarbageCollectableBlobStore blobStore, |
| Executor executor, |
| long maxLastModifiedInterval, |
| @Nullable String repositoryId, |
| @Nullable Whiteboard whiteboard, |
| @Nullable StatisticsProvider statisticsProvider) |
| throws IOException { |
| this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, maxLastModifiedInterval, repositoryId, whiteboard, statisticsProvider); |
| } |
| |
| @Override |
| public void collectGarbage(boolean markOnly) throws Exception { |
| markAndSweep(markOnly, false); |
| } |
| |
| @Override |
| public void collectGarbage(boolean markOnly, boolean forceBlobRetrieve) throws Exception { |
| markAndSweep(markOnly, forceBlobRetrieve); |
| } |
| |
| /** |
| * Returns the stats related to GC for all repos |
| * |
| * @return a list of GarbageCollectionRepoStats objects |
| * @throws Exception |
| */ |
| @Override |
| public List<GarbageCollectionRepoStats> getStats() throws Exception { |
| List<GarbageCollectionRepoStats> stats = newArrayList(); |
| if (SharedDataStoreUtils.isShared(blobStore)) { |
| // Get all the references available |
| List<DataRecord> refFiles = |
| ((SharedDataStore) blobStore).getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType()); |
| ImmutableListMultimap<String, DataRecord> references = |
| FluentIterable.from(refFiles).index(new Function<DataRecord, String>() { |
| @Override public String apply(DataRecord input) { |
| return SharedStoreRecordType.REFERENCES.getIdFromName(input.getIdentifier().toString()); |
| |
| } |
| }); |
| |
| // Get all the markers available |
| List<DataRecord> markerFiles = |
| ((SharedDataStore) blobStore).getAllMetadataRecords(SharedStoreRecordType.MARKED_START_MARKER.getType()); |
| Map<String, DataRecord> markers = Maps.uniqueIndex(markerFiles, new Function<DataRecord, String>() { |
| @Override |
| public String apply(DataRecord input) { |
| return input.getIdentifier().toString().substring(SharedStoreRecordType.MARKED_START_MARKER.getType().length() + 1); |
| } |
| }); |
| |
| // Get all the repositories registered |
| List<DataRecord> repoFiles = |
| ((SharedDataStore) blobStore).getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()); |
| |
| for (DataRecord repoRec : repoFiles) { |
| String id = SharedStoreRecordType.REPOSITORY.getIdFromName(repoRec.getIdentifier().toString()); |
| |
| GarbageCollectionRepoStats stat = new GarbageCollectionRepoStats(); |
| stats.add(stat); |
| |
| stat.setRepositoryId(id); |
| if (id != null && id.equals(repoId)) { |
| stat.setLocal(true); |
| } |
| |
| if (references.containsKey(id)) { |
| ImmutableList<DataRecord> refRecs = references.get(id); |
| for(DataRecord refRec : refRecs) { |
| String uniqueSessionId = refRec.getIdentifier().toString() |
| .substring(SharedStoreRecordType.REFERENCES.getType().length() + 1); |
| |
| stat.setEndTime(refRec.getLastModified()); |
| stat.setLength(refRec.getLength()); |
| |
| if (markers.containsKey(uniqueSessionId)) { |
| stat.setStartTime(markers.get(uniqueSessionId).getLastModified()); |
| } |
| |
| LineNumberReader reader = null; |
| try { |
| reader = new LineNumberReader(new InputStreamReader(refRec.getStream())); |
| while (reader.readLine() != null) { |
| } |
| stat.setNumLines(reader.getLineNumber()); |
| } finally { |
| Closeables.close(reader, true); |
| } |
| } |
| } |
| } |
| } |
| return stats; |
| } |
| |
| @Override |
| public OperationsStatsMBean getOperationStats() { |
| return stats; |
| } |
| |
| @Override |
| public OperationsStatsMBean getConsistencyOperationStats() { |
| return consistencyStats; |
| } |
| |
| /** |
| * Mark and sweep. Main entry method for GC. |
| * |
| * @param markOnly whether to mark only |
| * @param forceBlobRetrieve force retrieve blob ids |
| * @throws Exception the exception |
| */ |
| protected void markAndSweep(boolean markOnly, boolean forceBlobRetrieve) throws Exception { |
| statsCollector.start(); |
| boolean threw = true; |
| GarbageCollectorFileState fs = new GarbageCollectorFileState(root); |
| Stopwatch sw = Stopwatch.createStarted(); |
| |
| try { |
| LOG.info("Starting Blob garbage collection with markOnly [{}]", markOnly); |
| |
| long markStart = System.currentTimeMillis(); |
| long markFinish; |
| try { |
| mark(fs); |
| } finally { |
| markFinish = sw.elapsed(TimeUnit.MILLISECONDS); |
| statsCollector.updateMarkDuration(markFinish, TimeUnit.MILLISECONDS); |
| LOG.info("Blob garbage collection Mark completed in {} ({} ms).", |
| sw.toString(), sw.elapsed(TimeUnit.MILLISECONDS)); |
| } |
| |
| if (!markOnly) { |
| long deleteCount; |
| try { |
| deleteCount = sweep(fs, markStart, forceBlobRetrieve); |
| threw = false; |
| } finally { |
| sw.stop(); |
| statsCollector.updateSweepDuration(sw.elapsed(TimeUnit.MILLISECONDS) - markFinish, TimeUnit.MILLISECONDS); |
| } |
| |
| long maxTime = getMaxModifiedTime(markStart) > 0 ? getMaxModifiedTime(markStart) : markStart; |
| |
| LOG.info("Blob garbage collection completed in {} ({} ms). Number of blobs deleted [{}] with max modification time of [{}]", |
| sw.toString(), sw.elapsed(TimeUnit.MILLISECONDS), deleteCount, timestampToString(maxTime)); |
| } |
| } catch (Exception e) { |
| statsCollector.finishFailure(); |
| LOG.error("Blob garbage collection error", e); |
| throw e; |
| } finally { |
| statsCollector.updateDuration(sw.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); |
| if (!LOG.isTraceEnabled() && !traceOutput) { |
| Closeables.close(fs, threw); |
| } |
| } |
| } |
| |
| /** |
| * Mark phase of the GC. |
| * @param fs the garbage collector file state |
| */ |
| protected void mark(GarbageCollectorFileState fs) throws IOException, DataStoreException { |
| LOG.debug("Starting mark phase of the garbage collector"); |
| |
| String uniqueSuffix = UUID.randomUUID().toString(); |
| |
| // Create a time marker in the data store if applicable |
| GarbageCollectionType.get(blobStore).addMarkedStartMarker(blobStore, repoId, uniqueSuffix); |
| |
| // Mark all used references |
| iterateNodeTree(fs, false); |
| |
| // Move the marked references file to the data store meta area if applicable |
| GarbageCollectionType.get(blobStore).addMarked(blobStore, fs, repoId, uniqueSuffix); |
| |
| LOG.debug("Ending mark phase of the garbage collector"); |
| } |
| |
| /** |
| * Difference phase where the GC candidates are identified. |
| * |
| * @param fs the garbage collector file state |
| * @throws IOException |
| * Signals that an I/O exception has occurred. |
| */ |
| private void difference(GarbageCollectorFileState fs) throws IOException { |
| LOG.debug("Starting difference phase of the garbage collector"); |
| |
| FileLineDifferenceIterator iter = new FileLineDifferenceIterator( |
| fs.getMarkedRefs(), |
| fs.getAvailableRefs(), |
| transformer); |
| int candidates = FileIOUtils.writeStrings(iter, fs.getGcCandidates(), true); |
| LOG.debug("Found candidates - " + candidates); |
| |
| LOG.debug("Ending difference phase of the garbage collector"); |
| } |
| |
| /** |
| * Sweep phase of gc candidate deletion. |
| * <p> |
| * Performs the following steps depending upon the type of the blob store refer |
| * {@link org.apache.jackrabbit.oak.plugins.blob.SharedDataStore.Type}: |
| * |
| * <ul> |
| * <li>Shared</li> |
| * <li> |
| * <ul> |
| * <li> Merge all marked references (from the mark phase run independently) available in the data store meta |
| * store (from all configured independent repositories). |
| * <li> Retrieve all blob ids available. |
| * <li> Diffs the 2 sets above to retrieve list of blob ids not used. |
| * <li> Deletes only blobs created after |
| * (earliest time stamp of the marked references - #maxLastModifiedInterval) from the above set. |
| * </ul> |
| * </li> |
| * |
| * <li>Default</li> |
| * <li> |
| * <ul> |
| * <li> Mark phase already run. |
| * <li> Retrieve all blob ids available. |
| * <li> Diffs the 2 sets above to retrieve list of blob ids not used. |
| * <li> Deletes only blobs created after |
| * (time stamp of the marked references - #maxLastModifiedInterval). |
| * </ul> |
| * </li> |
| * </ul> |
| * |
| * @return the number of blobs deleted |
| * @throws Exception the exception |
| * @param fs the garbage collector file state |
| * @param markStart the start time of mark to take as reference for deletion |
| * @param forceBlobRetrieve |
| */ |
| protected long sweep(GarbageCollectorFileState fs, long markStart, boolean forceBlobRetrieve) throws Exception { |
| long earliestRefAvailTime; |
| // Merge all the blob references available from all the reference files in the data store meta store |
| // Only go ahead if merge succeeded |
| try { |
| earliestRefAvailTime = |
| GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs); |
| LOG.debug("Earliest reference available for timestamp [{}]", earliestRefAvailTime); |
| earliestRefAvailTime = (earliestRefAvailTime < markStart ? earliestRefAvailTime : markStart); |
| } catch (Exception e) { |
| return 0; |
| } |
| |
| // Find all blob references after iterating over the whole repository |
| (new BlobIdRetriever(fs, forceBlobRetrieve)).call(); |
| |
| // Calculate the references not used |
| difference(fs); |
| long count = 0; |
| long deleted = 0; |
| |
| long maxModifiedTime = getMaxModifiedTime(earliestRefAvailTime); |
| LOG.debug("Starting sweep phase of the garbage collector"); |
| LOG.debug("Sweeping blobs with modified time > than the configured max deleted time ({}). ", |
| timestampToString(maxModifiedTime)); |
| |
| BufferedWriter removesWriter = null; |
| LineIterator iterator = null; |
| long deletedSize = 0; |
| int numDeletedSizeAvailable = 0; |
| try { |
| removesWriter = Files.newWriter(fs.getGarbage(), Charsets.UTF_8); |
| ArrayDeque<String> removesQueue = new ArrayDeque<String>(); |
| iterator = |
| FileUtils.lineIterator(fs.getGcCandidates(), Charsets.UTF_8.name()); |
| |
| Iterator<List<String>> partitions = Iterators.partition(iterator, getBatchCount()); |
| while (partitions.hasNext()) { |
| List<String> ids = partitions.next(); |
| count += ids.size(); |
| deleted += BlobCollectionType.get(blobStore) |
| .sweepInternal(blobStore, ids, removesQueue, maxModifiedTime); |
| saveBatchToFile(newArrayList(removesQueue), removesWriter); |
| |
| for(String deletedId : removesQueue) { |
| // Estimate the size of the blob |
| long length = DataStoreBlobStore.BlobId.of(deletedId).getLength(); |
| if (length != -1) { |
| deletedSize += length; |
| numDeletedSizeAvailable += 1; |
| } |
| } |
| removesQueue.clear(); |
| } |
| } finally { |
| LineIterator.closeQuietly(iterator); |
| closeQuietly(removesWriter); |
| } |
| |
| BlobCollectionType.get(blobStore).handleRemoves(blobStore, fs.getGarbage(), fs.getMarkedRefs()); |
| |
| if(count != deleted) { |
| LOG.warn("Deleted only [{}] blobs entries from the [{}] candidates identified. This may happen if blob " |
| + "modified time is > " |
| + "than the max deleted time ({})", deleted, count, |
| timestampToString(maxModifiedTime)); |
| } |
| |
| if (deletedSize > 0) { |
| LOG.info("Estimated size recovered for {} deleted blobs is {} ({} bytes)", |
| numDeletedSizeAvailable, |
| org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount(deletedSize), deletedSize); |
| } |
| |
| statsCollector.updateNumCandidates(count); |
| statsCollector.updateNumDeleted(deleted); |
| statsCollector.updateTotalSizeDeleted(deletedSize); |
| |
| // Remove all the merged marked references |
| GarbageCollectionType.get(blobStore).removeAllMarkedReferences(blobStore); |
| LOG.debug("Ending sweep phase of the garbage collector"); |
| return deleted; |
| } |
| |
| private int getBatchCount() { |
| return batchCount; |
| } |
| |
| /** |
| * 3 possibilities |
| * - If maxLastModifiedInterval <= 0 then return 0 which is interpreted as current by delete call |
| * (For testing purposes only) |
| * - If oldest checkpoint creation date > 0 then reference time is the earliest of that and the parameter |
| * maxModificationReferenceTime |
| * - Else the parameter maxModificationReferenceTime is used as the reference time |
| * |
| * @param maxModificationReferenceTime typically the mark phase start time (could be 0 for tests) |
| * @return max modified time of blobs to be considered for deletion |
| */ |
| private long getMaxModifiedTime(long maxModificationReferenceTime) { |
| if (maxLastModifiedInterval <= 0) { |
| return 0; |
| } |
| |
| long oldestCheckpoint = -1; |
| if (checkpointMbean != null) { |
| oldestCheckpoint = checkpointMbean.getOldestCheckpointCreationDate().getTime(); |
| LOG.debug("Oldest checkpoint data retrieved {} ", oldestCheckpoint); |
| } |
| LOG.debug("maxModificationReferenceTime {} ", maxModificationReferenceTime); |
| |
| maxModificationReferenceTime = maxModificationReferenceTime <= 0 ? |
| System.currentTimeMillis() : maxModificationReferenceTime; |
| long calculatedReferenceTime = (oldestCheckpoint <= 0 ? maxModificationReferenceTime : |
| Math.min(maxModificationReferenceTime, oldestCheckpoint)); |
| LOG.debug("Calculated reference time {} ", calculatedReferenceTime); |
| |
| return (calculatedReferenceTime - maxLastModifiedInterval); |
| } |
| |
| /** |
| * Save batch to file. |
| */ |
| static void saveBatchToFile(List<String> ids, BufferedWriter writer) throws IOException { |
| for (String id : ids) { |
| FileIOUtils.writeAsLine(writer, id, true); |
| } |
| writer.flush(); |
| } |
| |
| /** |
| * Iterates the complete node tree and collect all blob references |
| * @param fs the garbage collector file state |
| * @param logPath whether to log path in the file or not |
| */ |
| protected void iterateNodeTree(GarbageCollectorFileState fs, final boolean logPath) throws IOException { |
| final BufferedWriter writer = Files.newWriter(fs.getMarkedRefs(), Charsets.UTF_8); |
| final AtomicInteger count = new AtomicInteger(); |
| try { |
| marker.collectReferences( |
| new ReferenceCollector() { |
| private final boolean debugMode = LOG.isTraceEnabled(); |
| |
| @Override |
| public void addReference(String blobId, final String nodeId) { |
| if (debugMode) { |
| LOG.trace("BlobId : {}, NodeId : {}", blobId, nodeId); |
| } |
| |
| try { |
| Iterator<String> idIter = blobStore.resolveChunks(blobId); |
| final Joiner delimJoiner = Joiner.on(DELIM).skipNulls(); |
| Iterator<List<String>> partitions = Iterators.partition(idIter, getBatchCount()); |
| while (partitions.hasNext()) { |
| List<String> idBatch = Lists.transform(partitions.next(), new Function<String, |
| String>() { |
| @Nullable @Override |
| public String apply(@Nullable String id) { |
| if (logPath) { |
| return delimJoiner.join(id, nodeId); |
| } |
| return id; |
| } |
| }); |
| if (debugMode) { |
| LOG.trace("chunkIds : {}", idBatch); |
| } |
| count.getAndAdd(idBatch.size()); |
| saveBatchToFile(idBatch, writer); |
| } |
| |
| if (count.get() % getBatchCount() == 0) { |
| LOG.info("Collected ({}) blob references", count.get()); |
| } |
| } catch (Exception e) { |
| throw new RuntimeException("Error in retrieving references", e); |
| } |
| } |
| } |
| ); |
| LOG.info("Number of valid blob references marked under mark phase of " + |
| "Blob garbage collection [{}]", count.get()); |
| // sort the marked references with the first part of the key |
| sort(fs.getMarkedRefs(), |
| new Comparator<String>() { |
| @Override |
| public int compare(String s1, String s2) { |
| return s1.split(DELIM)[0].compareTo(s2.split(DELIM)[0]); |
| } |
| }); |
| } finally { |
| closeQuietly(writer); |
| } |
| } |
| |
| /** |
| * Checks for the DataStore consistency and reports the number of missing blobs still referenced. |
| * |
| * @return the missing blobs |
| * @throws Exception |
| */ |
| @Override |
| public long checkConsistency() throws Exception { |
| consistencyStatsCollector.start(); |
| Stopwatch sw = Stopwatch.createStarted(); |
| |
| boolean threw = true; |
| GarbageCollectorFileState fs = new GarbageCollectorFileState(root); |
| long candidates = 0; |
| |
| try { |
| LOG.info("Starting blob consistency check"); |
| |
| // Find all blobs available in the blob store |
| ListenableFutureTask<Integer> blobIdRetriever = ListenableFutureTask.create(new BlobIdRetriever(fs, |
| true)); |
| executor.execute(blobIdRetriever); |
| |
| // Mark all used blob references |
| iterateNodeTree(fs, true); |
| consistencyStatsCollector.updateMarkDuration(sw.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); |
| |
| try { |
| blobIdRetriever.get(); |
| } catch (ExecutionException e) { |
| LOG.warn("Error occurred while fetching all the blobIds from the BlobStore"); |
| threw = false; |
| throw e; |
| } |
| |
| LOG.trace("Starting difference phase of the consistency check"); |
| FileLineDifferenceIterator iter = new FileLineDifferenceIterator( |
| fs.getAvailableRefs(), |
| fs.getMarkedRefs(), |
| transformer); |
| // If tracking then also filter ids being tracked which are active deletions for lucene |
| candidates = BlobCollectionType.get(blobStore).filter(blobStore, iter, fs); |
| |
| LOG.trace("Ending difference phase of the consistency check"); |
| LOG.info("Consistency check found [{}] missing blobs", candidates); |
| |
| if (candidates > 0) { |
| LOG.warn("Consistency check failure in the the blob store : {}, check missing candidates in file {}", |
| blobStore, fs.getGcCandidates().getAbsolutePath()); |
| consistencyStatsCollector.finishFailure(); |
| consistencyStatsCollector.updateNumDeleted(candidates); |
| } |
| } finally { |
| if (!traceOutput && (!LOG.isTraceEnabled() && candidates == 0)) { |
| Closeables.close(fs, threw); |
| } |
| sw.stop(); |
| consistencyStatsCollector.updateDuration(sw.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); |
| } |
| return candidates; |
| } |
| |
| public void setTraceOutput(boolean trace) { |
| traceOutput = trace; |
| } |
| |
| /** |
| * BlobIdRetriever class to retrieve all blob ids. |
| */ |
| private class BlobIdRetriever implements Callable<Integer> { |
| private final GarbageCollectorFileState fs; |
| private final boolean forceRetrieve; |
| |
| public BlobIdRetriever(GarbageCollectorFileState fs, boolean forceBlobRetrieve) { |
| this.fs = fs; |
| this.forceRetrieve = forceBlobRetrieve; |
| } |
| |
| @Override |
| public Integer call() throws Exception { |
| if (!forceRetrieve) { |
| BlobCollectionType.get(blobStore).retrieve(blobStore, fs, getBatchCount()); |
| LOG.info("Length of blob ids file retrieved from tracker {}", fs.getAvailableRefs().length()); |
| } |
| |
| // If the length is 0 then references not available from the tracker |
| // retrieve from the data store |
| if (fs.getAvailableRefs().length() <= 0) { |
| BlobCollectionType.DEFAULT.retrieve(blobStore, fs, getBatchCount()); |
| LOG.info("Length of blob ids file retrieved {}", fs.getAvailableRefs().length()); |
| |
| BlobCollectionType.get(blobStore).track(blobStore, fs); |
| } |
| return 0; |
| } |
| } |
| |
| /** |
| * Provides a readable string for given timestamp |
| */ |
| private static String timestampToString(long timestamp){ |
| return (new Timestamp(timestamp) + "00").substring(0, 23); |
| } |
| |
| /** |
| * Defines different data store types from the garbage collection perspective and encodes the divergent behavior. |
| * <ul></ul> |
| */ |
| enum GarbageCollectionType { |
| SHARED { |
| /** |
| * Remove the maked references and the marked markers from the blob store root. Default NOOP. |
| * |
| * @param blobStore the blobStore instance |
| */ |
| @Override |
| void removeAllMarkedReferences(GarbageCollectableBlobStore blobStore) { |
| ((SharedDataStore) blobStore).deleteAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType()); |
| ((SharedDataStore) blobStore).deleteAllMetadataRecords(SharedStoreRecordType.MARKED_START_MARKER.getType()); |
| } |
| |
| /** |
| * Merge all marked references available from all repositories and return the earliest time of the references. |
| * |
| * @param blobStore the blob store |
| * @param fs the fs |
| * @return the long the earliest time of the available references |
| * @throws IOException Signals that an I/O exception has occurred. |
| * @throws DataStoreException the data store exception |
| */ |
| @Override |
| long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore, |
| GarbageCollectorFileState fs) |
| throws IOException, DataStoreException { |
| |
| List<DataRecord> refFiles = |
| ((SharedDataStore) blobStore).getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType()); |
| |
| // Get all the repositories registered |
| List<DataRecord> repoFiles = |
| ((SharedDataStore) blobStore).getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()); |
| |
| // Retrieve repos for which reference files have not been created |
| Set<String> unAvailRepos = |
| SharedDataStoreUtils.refsNotAvailableFromRepos(repoFiles, refFiles); |
| if (unAvailRepos.isEmpty()) { |
| // List of files to be merged |
| List<File> files = newArrayList(); |
| for (DataRecord refFile : refFiles) { |
| File file = copy(refFile.getStream()); |
| files.add(file); |
| } |
| |
| merge(files, fs.getMarkedRefs()); |
| |
| // Get the timestamp to indicate the earliest mark phase start |
| List<DataRecord> markerFiles = |
| ((SharedDataStore) blobStore).getAllMetadataRecords( |
| SharedStoreRecordType.MARKED_START_MARKER.getType()); |
| long earliestMarker = SharedDataStoreUtils.getEarliestRecord(markerFiles).getLastModified(); |
| LOG.trace("Earliest marker timestamp {}", earliestMarker); |
| |
| long earliestRef = SharedDataStoreUtils.getEarliestRecord(refFiles).getLastModified(); |
| LOG.trace("Earliest ref timestamp {}", earliestRef); |
| |
| return (earliestMarker < earliestRef ? earliestMarker : earliestRef); |
| } else { |
| LOG.error("Not all repositories have marked references available : {}", unAvailRepos); |
| throw new IOException("Not all repositories have marked references available"); |
| } |
| } |
| |
| /** |
| * Adds the marked references to the blob store root. Default NOOP |
| * |
| * @param blobStore the blob store |
| * @param fs the fs |
| * @param repoId the repo id |
| * @param uniqueSuffix the unique session suffix |
| * @throws DataStoreException the data store exception |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| @Override |
| void addMarked(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs, String repoId, |
| String uniqueSuffix) throws DataStoreException, IOException { |
| boolean exists = ((SharedDataStore) blobStore) |
| .metadataRecordExists(SharedStoreRecordType.REFERENCES.getNameFromId(repoId)); |
| if (exists) { |
| LOG.info("References for repository id {} already exists. Creating a duplicate one. " |
| + "Please check for inadvertent sharing of repository id by different repositories", repoId); |
| } |
| |
| ((SharedDataStore) blobStore).addMetadataRecord(fs.getMarkedRefs(), SharedStoreRecordType.REFERENCES |
| .getNameFromIdPrefix(repoId, uniqueSuffix)); |
| } |
| |
| @Override |
| public void addMarkedStartMarker(GarbageCollectableBlobStore blobStore, String repoId, |
| String uniqueSuffix) { |
| try { |
| ((SharedDataStore) blobStore).addMetadataRecord(new ByteArrayInputStream(new byte[0]), |
| SharedStoreRecordType.MARKED_START_MARKER |
| .getNameFromIdPrefix(repoId, uniqueSuffix)); |
| } catch (DataStoreException e) { |
| LOG.debug("Error creating marked time marker for repo : {}", repoId); |
| } |
| } |
| }, |
| DEFAULT; |
| |
| void removeAllMarkedReferences(GarbageCollectableBlobStore blobStore) {} |
| |
| void addMarked(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs, String repoId, |
| String uniqueSuffix) throws DataStoreException, IOException {} |
| |
| long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore, |
| GarbageCollectorFileState fs) |
| throws IOException, DataStoreException { |
| // throw id the marked refs not available. |
| if (!fs.getMarkedRefs().exists() || fs.getMarkedRefs().length() == 0) { |
| throw new IOException("Marked references not available"); |
| } |
| return fs.getMarkedRefs().lastModified(); |
| } |
| |
| public static GarbageCollectionType get(GarbageCollectableBlobStore blobStore) { |
| if (SharedDataStoreUtils.isShared(blobStore)) { |
| return SHARED; |
| } |
| return DEFAULT; |
| } |
| |
| public void addMarkedStartMarker(GarbageCollectableBlobStore blobStore, String repoId, String uniqueSuffix) {} |
| } |
| |
| /** |
| * Defines different blob collection types and encodes the divergent behavior. |
| * <ul></ul> |
| */ |
| private enum BlobCollectionType { |
| TRACKER { |
| @Override |
| void retrieve(GarbageCollectableBlobStore blobStore, |
| GarbageCollectorFileState fs, int batchCount) throws Exception { |
| ((BlobTrackingStore) blobStore).getTracker() |
| .get(fs.getAvailableRefs().getAbsolutePath()); |
| } |
| |
| @Override |
| void handleRemoves(GarbageCollectableBlobStore blobStore, File removedIds, File markedRefs) throws IOException { |
| BlobTrackingStore store = (BlobTrackingStore) blobStore; |
| BlobIdTracker tracker = (BlobIdTracker) store.getTracker(); |
| tracker.remove(removedIds); |
| tracker.getDeleteTracker().reconcile(markedRefs); |
| } |
| |
| @Override |
| void track(GarbageCollectableBlobStore blobStore, |
| GarbageCollectorFileState fs) { |
| try { |
| File f = File.createTempFile("blobiddownload", null); |
| copyFile(fs.getAvailableRefs(), f); |
| ((BlobTrackingStore) blobStore).getTracker().add(f); |
| } catch (IOException e) { |
| LOG.warn("Unable to track blob ids locally"); |
| } |
| } |
| |
| @Override |
| public int filter(GarbageCollectableBlobStore blobStore, FileLineDifferenceIterator iter, |
| GarbageCollectorFileState fs) throws IOException { |
| // Write the original candidates |
| FileIOUtils.writeStrings(iter, fs.getGcCandidates(), true); |
| |
| // Filter the ids actively deleted |
| BlobTrackingStore store = (BlobTrackingStore) blobStore; |
| BlobIdTracker tracker = (BlobIdTracker) store.getTracker(); |
| |
| // Move the candidates identified to a temp file |
| File candTemp = createTempFile("candTemp", null); |
| copyFile(fs.getGcCandidates(), candTemp); |
| |
| Iterator<String> filter = tracker.getDeleteTracker().filter(candTemp); |
| try { |
| return FileIOUtils.writeStrings(filter, fs.getGcCandidates(), true); |
| } finally { |
| if (filter != null && filter instanceof FileLineDifferenceIterator) { |
| ((FileLineDifferenceIterator) filter).close(); |
| } |
| |
| if (candTemp != null) { |
| candTemp.delete(); |
| } |
| } |
| } |
| }, |
| DEFAULT; |
| |
| /** |
| * Deletes the given batch by deleting individually to exactly know the actual deletes. |
| */ |
| long sweepInternal(GarbageCollectableBlobStore blobStore, List<String> ids, |
| ArrayDeque<String> exceptionQueue, long maxModified) { |
| long totalDeleted = 0; |
| LOG.trace("Blob ids to be deleted {}", ids); |
| for (String id : ids) { |
| try { |
| long deleted = blobStore.countDeleteChunks(newArrayList(id), maxModified); |
| if (deleted != 1) { |
| LOG.debug("Blob [{}] not deleted", id); |
| } else { |
| exceptionQueue.add(id); |
| totalDeleted += 1; |
| } |
| } catch (Exception e) { |
| LOG.warn("Error occurred while deleting blob with id [{}]", id, e); |
| } |
| } |
| return totalDeleted; |
| } |
| |
| /** |
| * Retrieve the put the list of available blobs in the file. |
| * |
| * @param blobStore |
| * @param fs |
| * @param batchCount |
| * @throws Exception |
| */ |
| void retrieve(GarbageCollectableBlobStore blobStore, |
| GarbageCollectorFileState fs, int batchCount) throws Exception { |
| LOG.debug("Starting retrieve of all blobs"); |
| int blobsCount = 0; |
| Iterator<String> idsIter = null; |
| try { |
| idsIter = blobStore.getAllChunkIds(0); |
| blobsCount = FileIOUtils.writeStrings(idsIter, fs.getAvailableRefs(), true, LOG, "Retrieved blobs - "); |
| |
| // sort the file |
| sort(fs.getAvailableRefs()); |
| LOG.info("Number of blobs present in BlobStore : [{}] ", blobsCount); |
| } finally { |
| if (idsIter instanceof Closeable) { |
| try { |
| Closeables.close((Closeable) idsIter, false); |
| } catch (Exception e) { |
| LOG.debug("Error closing iterator"); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Hook to handle all the removed ids. |
| * |
| * @param blobStore |
| * @param removedIds |
| * @param markedRefs |
| * @throws IOException |
| */ |
| void handleRemoves(GarbageCollectableBlobStore blobStore, File removedIds, File markedRefs) throws IOException { |
| FileUtils.forceDelete(removedIds); |
| } |
| |
| /** |
| * Tracker may want to track this file |
| * |
| * @param blobStore |
| * @param fs |
| */ |
| void track(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs) { |
| } |
| |
| public static BlobCollectionType get(GarbageCollectableBlobStore blobStore) { |
| if (blobStore instanceof BlobTrackingStore) { |
| BlobTracker tracker = ((BlobTrackingStore) blobStore).getTracker(); |
| if (tracker != null) { |
| return TRACKER; |
| } |
| } |
| return DEFAULT; |
| } |
| |
| public int filter(GarbageCollectableBlobStore blobStore, FileLineDifferenceIterator iter, |
| GarbageCollectorFileState fs) throws IOException { |
| return FileIOUtils.writeStrings(iter, fs.getGcCandidates(), true); |
| } |
| } |
| |
| class GarbageCollectionOperationStats implements OperationsStatsMBean { |
| static final String NAME = "DataStoreGarbageCollection"; |
| static final String CONSISTENCY_NAME = "DataStoreConsistencyCheck"; |
| static final String START = "COUNTER"; |
| static final String FINISH_FAILURE = "FAILURE"; |
| static final String DURATION = "ACTIVE_TIMER"; |
| static final String MARK_DURATION = "MARK_TIMER"; |
| static final String SWEEP_DURATION = "SWEEP_TIMER"; |
| static final String NUM_BLOBS_DELETED = "NUM_BLOBS_DELETED"; |
| static final String TOTAL_SIZE_DELETED = "TOTAL_SIZE_DELETED"; |
| static final String NUM_CANDIDATES = "NUM_CANDIDATES"; |
| |
| private final String typeName; |
| |
| private CounterStats startCounter; |
| private CounterStats finishFailureCounter; |
| private CounterStats numDeletedCounter; |
| private CounterStats totalSizeDeletedCounter; |
| private CounterStats numCandidatesCounter; |
| private TimerStats duration; |
| private final TimerStats markDuration; |
| private final TimerStats sweepDuration; |
| private final OperationStatsCollector collector; |
| |
| GarbageCollectionOperationStats(StatisticsProvider sp, String typeName) { |
| this.typeName = typeName; |
| |
| this.startCounter = sp.getCounterStats(getMetricName(START), StatsOptions.METRICS_ONLY); |
| this.finishFailureCounter = sp.getCounterStats(getMetricName(FINISH_FAILURE), StatsOptions.METRICS_ONLY); |
| this.numDeletedCounter = sp.getCounterStats(getMetricName(NUM_BLOBS_DELETED), StatsOptions.METRICS_ONLY); |
| this.totalSizeDeletedCounter = sp.getCounterStats(getMetricName(TOTAL_SIZE_DELETED), StatsOptions.METRICS_ONLY); |
| this.numCandidatesCounter = sp.getCounterStats(getMetricName(NUM_CANDIDATES), StatsOptions.METRICS_ONLY); |
| this.duration = sp.getTimer(getMetricName(DURATION), StatsOptions.METRICS_ONLY); |
| this.markDuration = sp.getTimer(getMetricName(MARK_DURATION), StatsOptions.METRICS_ONLY); |
| this.sweepDuration = sp.getTimer(getMetricName(SWEEP_DURATION), StatsOptions.METRICS_ONLY); |
| |
| this.collector = new OperationStatsCollector() { |
| @Override |
| public void start() { |
| startCounter.inc(); |
| } |
| |
| @Override |
| public void finishFailure() { |
| finishFailureCounter.inc(); |
| } |
| |
| @Override |
| public void updateNumDeleted(long num) { |
| numDeletedCounter.inc(num); |
| } |
| |
| @Override |
| public void updateNumCandidates(long num) { |
| numCandidatesCounter.inc(num); |
| } |
| |
| @Override |
| public void updateTotalSizeDeleted(long size) { |
| totalSizeDeletedCounter.inc(size); |
| } |
| |
| @Override |
| public void updateDuration(long time, TimeUnit timeUnit) { |
| duration.update(time, timeUnit); |
| } |
| |
| @Override public void updateMarkDuration(long time, TimeUnit timeUnit) { |
| markDuration.update(time, timeUnit); |
| } |
| |
| @Override public void updateSweepDuration(long time, TimeUnit timeUnit) { |
| sweepDuration.update(time, timeUnit); |
| } |
| }; |
| } |
| |
| |
| GarbageCollectionOperationStats(StatisticsProvider sp) { |
| this(sp, NAME); |
| } |
| |
| private String getMetricName(String name) { |
| return getName() + "." + name; |
| } |
| |
| protected OperationStatsCollector getCollector() { |
| return collector; |
| } |
| |
| @Override public String getName() { |
| return TYPE + "." + typeName; |
| } |
| |
| @Override public long getStartCount() { |
| return startCounter.getCount(); |
| } |
| |
| @Override public long getFailureCount() { |
| return finishFailureCounter.getCount(); |
| } |
| |
| @Override public long duration() { |
| return duration.getCount(); |
| } |
| |
| @Override public long markDuration() { |
| return markDuration.getCount(); |
| } |
| |
| @Override public long numDeleted() { |
| return numDeletedCounter.getCount(); |
| } |
| |
| @Override public long sizeDeleted() { |
| return totalSizeDeletedCounter.getCount(); |
| } |
| } |
| } |