| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.jackrabbit.oak.segment.file.tar; |
| |
| import static com.google.common.collect.Lists.newArrayList; |
| import static com.google.common.collect.Maps.newLinkedHashMap; |
| import static com.google.common.collect.Maps.newTreeMap; |
| import static com.google.common.collect.Sets.newHashSet; |
| import static java.util.Collections.singletonList; |
| import static org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration; |
| |
| import java.io.Closeable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Collections; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.UUID; |
| import java.util.function.Consumer; |
| import java.util.stream.Collectors; |
| |
| import com.google.common.base.Predicate; |
| import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexLoader; |
| import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; |
| import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; |
| import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndex; |
| import org.apache.jackrabbit.oak.segment.file.tar.binaries.InvalidBinaryReferencesIndexException; |
| import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry; |
| import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; |
| import org.jetbrains.annotations.NotNull; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class TarReader implements Closeable { |
| |
| private static final Logger log = LoggerFactory.getLogger(TarReader.class); |
| |
| static TarReader open(String file, SegmentArchiveManager archiveManager) throws IOException { |
| TarReader reader = openFirstFileWithValidIndex(singletonList(file), archiveManager); |
| if (reader != null) { |
| return reader; |
| } else { |
| throw new IOException("Failed to open tar file " + file); |
| } |
| } |
| |
| /** |
| * Creates a {@link TarReader} instance for reading content from a tar file. |
| * If there exist multiple generations of the same tar file, they are all |
| * passed to this method. The latest generation with a valid tar index |
| * (which is a good indication of general validity of the file) is opened |
| * and the other generations are removed to clean things up. If none of the |
| * generations has a valid index, then something must have gone wrong and |
| * we'll try recover as much content as we can from the existing tar |
| * generations. |
| * |
| * @param files The generations of the same TAR file. |
| * @param recovery Strategy for recovering a damaged TAR file. |
| * @return An instance of {@link TarReader}. |
| */ |
| static TarReader open(Map<Character, String> files, TarRecovery recovery, SegmentArchiveManager archiveManager) throws IOException { |
| SortedMap<Character, String> sorted = newTreeMap(); |
| sorted.putAll(files); |
| |
| List<String> list = newArrayList(sorted.values()); |
| Collections.reverse(list); |
| |
| TarReader reader = openFirstFileWithValidIndex(list, archiveManager); |
| if (reader != null) { |
| return reader; |
| } |
| |
| // no generation has a valid index, so recover as much as we can |
| log.warn("Could not find a valid tar index in {}, recovering...", list); |
| LinkedHashMap<UUID, byte[]> entries = newLinkedHashMap(); |
| for (String file : sorted.values()) { |
| collectFileEntries(file, entries, true, archiveManager); |
| } |
| |
| // regenerate the first generation based on the recovered data |
| String file = sorted.values().iterator().next(); |
| generateTarFile(entries, file, recovery, archiveManager); |
| |
| reader = openFirstFileWithValidIndex(singletonList(file), archiveManager); |
| if (reader != null) { |
| return reader; |
| } else { |
| throw new IOException("Failed to open recovered tar file " + file); |
| } |
| } |
| |
| static TarReader openRO(Map<Character, String> files, TarRecovery recovery, SegmentArchiveManager archiveManager) throws IOException { |
| // for readonly store only try the latest generation of a given |
| // tar file to prevent any rollback or rewrite |
| String file = files.get(Collections.max(files.keySet())); |
| TarReader reader = openFirstFileWithValidIndex(singletonList(file), archiveManager); |
| if (reader != null) { |
| return reader; |
| } |
| log.warn("Could not find a valid tar index in {}, recovering read-only", file); |
| // collecting the entries (without touching the original file) and |
| // writing them into an artificial tar file '.ro.bak' |
| LinkedHashMap<UUID, byte[]> entries = newLinkedHashMap(); |
| collectFileEntries(file, entries, false, archiveManager); |
| file = findAvailGen(file, ".ro.bak", archiveManager); |
| generateTarFile(entries, file, recovery, archiveManager); |
| reader = openFirstFileWithValidIndex(singletonList(file), archiveManager); |
| if (reader != null) { |
| return reader; |
| } |
| throw new IOException("Failed to open tar file " + file); |
| } |
| |
| /** |
| * Collects all entries from the given file and optionally backs-up the |
| * file, by renaming it to a ".bak" extension |
| * |
| * @param file The TAR file. |
| * @param entries The map where the recovered entries will be collected |
| * into. |
| * @param backup If {@code true}, performs a backup of the TAR file. |
| */ |
| private static void collectFileEntries(String file, LinkedHashMap<UUID, byte[]> entries, boolean backup, SegmentArchiveManager archiveManager) throws IOException { |
| log.info("Recovering segments from tar file {}", file); |
| try { |
| archiveManager.recoverEntries(file, entries); |
| } catch (IOException e) { |
| log.warn("Could not read tar file {}, skipping...", file, e); |
| } |
| |
| if (backup) { |
| backupSafely(archiveManager, file); |
| } |
| } |
| |
| /** |
| * Regenerates a tar file from a list of entries. |
| * |
| * @param entries Map of entries to recover. The entries will be recovered |
| * in the iteration order of this {@link LinkedHashMap}. |
| * @param file The output file that will contain the recovered |
| * entries. |
| * @param recovery The recovery strategy to execute. |
| */ |
| private static void generateTarFile(LinkedHashMap<UUID, byte[]> entries, String file, TarRecovery recovery, SegmentArchiveManager archiveManager) throws IOException { |
| log.info("Regenerating tar file {}", file); |
| |
| try (TarWriter writer = new TarWriter(archiveManager, file)) { |
| for (Entry<UUID, byte[]> entry : entries.entrySet()) { |
| try { |
| recovery.recoverEntry(entry.getKey(), entry.getValue(), new EntryRecovery() { |
| |
| @Override |
| public void recoverEntry(long msb, long lsb, byte[] data, int offset, int size, GCGeneration generation) throws IOException { |
| writer.writeEntry(msb, lsb, data, offset, size, generation); |
| } |
| |
| @Override |
| public void recoverGraphEdge(UUID from, UUID to) { |
| writer.addGraphEdge(from, to); |
| } |
| |
| @Override |
| public void recoverBinaryReference(GCGeneration generation, UUID segmentId, String reference) { |
| writer.addBinaryReference(generation, segmentId, reference); |
| } |
| |
| }); |
| } catch (IOException e) { |
| throw new IOException(String.format("Unable to recover entry %s for file %s", entry.getKey(), file), e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Backup this tar file for manual inspection. Something went wrong earlier |
| * so we want to prevent the data from being accidentally removed or |
| * overwritten. |
| * |
| * @param file File to backup. |
| */ |
| private static void backupSafely(SegmentArchiveManager archiveManager, String file) throws IOException { |
| String backup = findAvailGen(file, ".bak", archiveManager); |
| log.info("Backing up {} to {}", file, backup); |
| if (!archiveManager.renameTo(file, backup)) { |
| log.warn("Renaming failed, so using copy to backup {}", file); |
| archiveManager.copyFile(file, backup); |
| if (!archiveManager.delete(file)) { |
| throw new IOException( |
| "Could not remove broken tar file " + file); |
| } |
| } |
| } |
| |
| /** |
| * Fine next available generation number so that a generated file doesn't |
| * overwrite another existing file. |
| * |
| * @param name The file to backup. |
| * @param ext The extension of the backed up file. |
| */ |
| private static String findAvailGen(String name, String ext, SegmentArchiveManager archiveManager) { |
| String backup = name + ext; |
| for (int i = 2; archiveManager.exists(backup); i++) { |
| backup = name + "." + i + ext; |
| } |
| return backup; |
| } |
| |
| private static TarReader openFirstFileWithValidIndex(List<String> archives, SegmentArchiveManager archiveManager) { |
| for (String name : archives) { |
| try { |
| SegmentArchiveReader reader = archiveManager.open(name); |
| if (reader != null) { |
| for (String other : archives) { |
| if (other != name) { |
| log.info("Removing unused tar file {}", other); |
| archiveManager.delete(other); |
| } |
| } |
| return new TarReader(archiveManager, reader); |
| } |
| } catch (IOException e) { |
| log.warn("Could not read tar file {}, skipping...", name, e); |
| } |
| } |
| |
| return null; |
| } |
| |
| private final SegmentArchiveManager archiveManager; |
| |
| private final SegmentArchiveReader archive; |
| |
| private final Set<UUID> segmentUUIDs; |
| |
| private volatile boolean hasGraph; |
| |
| private TarReader(SegmentArchiveManager archiveManager, SegmentArchiveReader archive) { |
| this.archiveManager = archiveManager; |
| this.archive = archive; |
| this.segmentUUIDs = archive.listSegments() |
| .stream() |
| .map(e -> new UUID(e.getMsb(), e.getLsb())) |
| .collect(Collectors.toSet()); |
| } |
| |
| long size() { |
| return archive.length(); |
| } |
| |
| /** |
| * Reads and returns the identifier of every segment included in the index |
| * of this TAR file. |
| * |
| * @return An instance of {@link Set}. |
| */ |
| Set<UUID> getUUIDs() { |
| return segmentUUIDs; |
| } |
| |
| /** |
| * Check if the requested entry exists in this TAR file. |
| * |
| * @param msb The most significant bits of the entry identifier. |
| * @param lsb The least significant bits of the entry identifier. |
| * @return {@code true} if the entry exists in this TAR file, {@code false} |
| * otherwise. |
| */ |
| boolean containsEntry(long msb, long lsb) { |
| return archive.containsSegment(msb, lsb); |
| } |
| |
| /** |
| * If the given segment is in this file, get the byte buffer that allows |
| * reading it. |
| * <p> |
| * Whether or not this will read from the file depends on whether memory |
| * mapped files are used or not. |
| * |
| * @param msb the most significant bits of the segment id |
| * @param lsb the least significant bits of the segment id |
| * @return the byte buffer, or null if not in this file. |
| */ |
| ByteBuffer readEntry(long msb, long lsb) throws IOException { |
| return archive.readSegment(msb, lsb); |
| } |
| |
| /** |
| * Read the entries in this TAR file. |
| * |
| * @return An array of {@link IndexEntry}. |
| */ |
| @NotNull |
| SegmentArchiveEntry[] getEntries() { |
| List<SegmentArchiveEntry> entryList = archive.listSegments(); |
| return entryList.toArray(new SegmentArchiveEntry[entryList.size()]); |
| } |
| |
| /** |
| * Read the references of an entry in this TAR file. |
| * |
| * @param id The identifier of the entry. |
| * @param graph The content of the graph of this TAR file. |
| * @return The references of the provided TAR entry. |
| */ |
| @NotNull |
| private static List<UUID> getReferences(UUID id, Map<UUID, List<UUID>> graph) { |
| List<UUID> references = graph.get(id); |
| |
| if (references == null) { |
| return Collections.emptyList(); |
| } |
| |
| return references; |
| } |
| |
| /** |
| * Collect the references of those BLOBs that are reachable from the entries |
| * in this TAR file. |
| * <p> |
| * The user-provided {@link Predicate} determines if entries belonging to a |
| * specific generation should be inspected for binary references of not. |
| * Given a generation number as input, if the predicate returns {@code |
| * true}, entries from that generation will be skipped. If the predicate |
| * returns {@code false}, entries from that generation will be inspected for |
| * references. |
| * <p> |
| * The provided {@link Consumer} is callback object that will be invoked for |
| * every reference found in the inspected entries. |
| * |
| * @param collector An instance of {@link Consumer}. |
| * @param skipGeneration An instance of {@link Predicate}. |
| */ |
| void collectBlobReferences(@NotNull Consumer<String> collector, Predicate<GCGeneration> skipGeneration) { |
| BinaryReferencesIndex references = getBinaryReferences(); |
| |
| if (references == null) { |
| return; |
| } |
| |
| references.forEach((generation, full, compacted, segment, reference) -> { |
| if (skipGeneration.apply(newGCGeneration(generation, full, compacted))) { |
| return; |
| } |
| collector.accept(reference); |
| }); |
| } |
| |
| /** |
| * Mark entries that can be reclaimed. |
| * <p> |
| * A data segment is reclaimable iff its generation is in the {@code |
| * reclaimGeneration} predicate. A bulk segment is reclaimable if it is not |
| * in {@code bulkRefs} or if it is transitively reachable through a non |
| * reclaimable data segment. |
| * <p> |
| * The algorithm implemented by this method uses a couple of supporting data |
| * structures. |
| * <p> |
| * The first of the supporting data structures is the set of bulk segments |
| * to keep. When this method is invoked, this set initially contains the set |
| * of bulk segments that are currently in use. The algorithm removes a |
| * reference from this set if the corresponding bulk segment is not |
| * referenced (either directly or transitively) from a marked data segment. |
| * The algorithm adds a reference to this set if a marked data segment is |
| * references the corresponding bulk segment. When this method returns, the |
| * references in this set represent bulk segments that are currently in use |
| * and should not be removed. |
| * <p> |
| * The second of the supporting data structures is the set of segments to |
| * reclaim. This set contains references to bulk and data segments. A |
| * reference to a bulk segment is added if the bulk segment is not |
| * referenced (either directly or transitively) by marked data segment. A |
| * reference to a data segment is added if the user-provided predicate |
| * returns {@code true} for that segment. When this method returns, this set |
| * contains segments that are not marked and can be removed. |
| * |
| * @param references The set of bulk segments to keep. |
| * @param reclaimable The set of segments to remove. |
| * @param context An instance of {@link CleanupContext}. |
| */ |
| void mark(Set<UUID> references, Set<UUID> reclaimable, CleanupContext context) throws IOException { |
| Map<UUID, List<UUID>> graph = getGraph(); |
| SegmentArchiveEntry[] entries = getEntries(); |
| for (int i = entries.length - 1; i >= 0; i--) { |
| // A bulk segments is *always* written before any data segment referencing it. |
| // Backward iteration ensures we see all references to bulk segments before |
| // we see the bulk segment itself. Therefore we can remove a bulk reference |
| // from the bulkRefs set once we encounter it, which save us some memory and |
| // CPU on subsequent look-ups. |
| SegmentArchiveEntry entry = entries[i]; |
| UUID id = new UUID(entry.getMsb(), entry.getLsb()); |
| GCGeneration generation = GCGeneration.newGCGeneration(entry); |
| if (context.shouldReclaim(id, generation, references.remove(id))) { |
| reclaimable.add(id); |
| } else { |
| for (UUID refId : getReferences(id, graph)) { |
| if (context.shouldFollow(id, refId)) { |
| references.add(refId); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Try to remove every segment contained in a user-provided set. |
| * <p> |
| * This method might refuse to remove the segments under the following |
| * circumstances. |
| * <p> |
| * First, if this TAR files does not contain any of the segments that are |
| * supposed to be removed. In this case, the method returns {@code null}. |
| * <p> |
| * Second, if this method contains some of the segments that are supposed to |
| * be removed, but the reclaimable space is be less than 1/4 of the current |
| * size of the TAR file. In this case, this method returns this {@link |
| * TarReader}. |
| * <p> |
| * Third, if this TAR file is in the highest generation possible ('z') and |
| * thus a new generation for this TAR file can't be created. In this case, |
| * the method returns this {@link TarReader}. |
| * <p> |
| * Fourth, if a new TAR file has been created but it is unreadable for |
| * unknown reasons. In this case, this method returns this {@link |
| * TarReader}. |
| * <p> |
| * If none of the above conditions apply, this method returns a new {@link |
| * TarReader} instance tha points to a TAR file that doesn't contain the |
| * removed segments. The returned {@link TarReader} will belong to the next |
| * generation of this {@link TarReader}. In this case, the {@code reclaimed} |
| * set will be updated to contain the identifiers of the segments that were |
| * removed from this TAR file. |
| * |
| * @param reclaim Set of segment sto reclaim. |
| * @param reclaimed Set of reclaimed segments. It will be update if this TAR |
| * file is rewritten. |
| * @return Either this {@link TarReader}, or a new instance of {@link |
| * TarReader}, or {@code null}. |
| */ |
| TarReader sweep(@NotNull Set<UUID> reclaim, @NotNull Set<UUID> reclaimed) throws IOException { |
| String name = archive.getName(); |
| log.debug("Cleaning up {}", name); |
| |
| Set<UUID> cleaned = newHashSet(); |
| int afterSize = 0; |
| int beforeSize = 0; |
| int afterCount = 0; |
| |
| SegmentArchiveEntry[] entries = getEntries(); |
| for (int i = 0; i < entries.length; i++) { |
| SegmentArchiveEntry entry = entries[i]; |
| beforeSize += archive.getEntrySize(entry.getLength()); |
| UUID id = new UUID(entry.getMsb(), entry.getLsb()); |
| if (reclaim.contains(id)) { |
| cleaned.add(id); |
| entries[i] = null; |
| } else { |
| afterSize += archive.getEntrySize(entry.getLength()); |
| afterCount += 1; |
| } |
| } |
| |
| if (afterCount == 0) { |
| log.debug("None of the entries of {} are referenceable.", name); |
| return null; |
| } |
| if (afterSize >= beforeSize * 3 / 4 && hasGraph()) { |
| // the space savings are not worth it at less than 25%, |
| // unless this tar file lacks a pre-compiled segment graph |
| // in which case we'll always generate a new tar file with |
| // the graph to speed up future garbage collection runs. |
| log.debug("Not enough space savings. ({}/{}). Skipping clean up of {}", |
| archive.length() - afterSize, archive.length(), name); |
| return this; |
| } |
| if (!hasGraph()) { |
| log.warn("Recovering {}, which is missing its graph.", name); |
| } |
| |
| int pos = name.length() - "a.tar".length(); |
| char generation = name.charAt(pos); |
| if (generation == 'z') { |
| log.debug("No garbage collection after reaching generation z: {}", name); |
| return this; |
| } |
| |
| String newFile = name.substring(0, pos) + (char) (generation + 1) + ".tar"; |
| |
| log.debug("Writing new generation {}", newFile); |
| TarWriter writer = new TarWriter(archiveManager, newFile); |
| for (SegmentArchiveEntry entry : entries) { |
| if (entry != null) { |
| long msb = entry.getMsb(); |
| long lsb = entry.getLsb(); |
| int size = entry.getLength(); |
| GCGeneration gen = GCGeneration.newGCGeneration(entry); |
| byte[] data = new byte[size]; |
| archive.readSegment(msb, lsb).get(data); |
| writer.writeEntry(msb, lsb, data, 0, size, gen); |
| } |
| } |
| |
| // Reconstruct the graph index for non-cleaned segments. |
| |
| Map<UUID, List<UUID>> graph = getGraph(); |
| |
| for (Entry<UUID, List<UUID>> e : graph.entrySet()) { |
| if (cleaned.contains(e.getKey())) { |
| continue; |
| } |
| |
| Set<UUID> vertices = newHashSet(); |
| |
| for (UUID vertex : e.getValue()) { |
| if (cleaned.contains(vertex)) { |
| continue; |
| } |
| |
| vertices.add(vertex); |
| } |
| |
| for (UUID vertex : vertices) { |
| writer.addGraphEdge(e.getKey(), vertex); |
| } |
| } |
| |
| // Reconstruct the binary reference index for non-cleaned segments. |
| |
| BinaryReferencesIndex references = getBinaryReferences(); |
| |
| if (references != null) { |
| references.forEach((gen, full, compacted, id, reference) -> { |
| if (cleaned.contains(id)) { |
| return; |
| } |
| writer.addBinaryReference(newGCGeneration(gen, full, compacted), id, reference); |
| }); |
| } |
| |
| writer.close(); |
| |
| TarReader reader = openFirstFileWithValidIndex(singletonList(newFile), archiveManager); |
| if (reader != null) { |
| reclaimed.addAll(cleaned); |
| return reader; |
| } else { |
| log.warn("Failed to open cleaned up tar file {}", getFileName()); |
| return this; |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| archive.close(); |
| } |
| |
| /** |
| * Loads and parses the optional pre-compiled graph entry from the given tar |
| * file. |
| * |
| * @return The parsed graph, or {@code null} if one was not found. |
| */ |
| Map<UUID, List<UUID>> getGraph() throws IOException { |
| ByteBuffer buffer = archive.getGraph(); |
| if (buffer == null) { |
| return null; |
| } else { |
| return GraphLoader.parseGraph(buffer); |
| } |
| } |
| |
| private boolean hasGraph() { |
| return archive.hasGraph(); |
| } |
| |
| /** |
| * Read the index of binary references from this TAR file. |
| * <p> |
| * The index of binary references is a two-level map. The key to the first |
| * level of the map is the generation. The key to the second level of the |
| * map is the identifier of a data segment in this TAR file. The value of |
| * the second-level map is the set of binary references contained in the |
| * segment. |
| * |
| * @return An instance of {@link Map}. |
| */ |
| BinaryReferencesIndex getBinaryReferences() { |
| BinaryReferencesIndex index = null; |
| try { |
| index = BinaryReferencesIndexLoader.parseBinaryReferencesIndex(archive.getBinaryReferences()); |
| } catch (InvalidBinaryReferencesIndexException | IOException e) { |
| log.warn("Exception while loading binary reference", e); |
| } |
| return index; |
| } |
| |
| /** |
| * Return the path of this TAR file. |
| * |
| * @return An instance of {@link File}. |
| */ |
| String getFileName() { |
| return archive.getName(); |
| } |
| |
| //------------------------------------------------------------< Object >-- |
| |
| @Override |
| public String toString() { |
| return getFileName(); |
| } |
| |
| } |