| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.db; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.db.lifecycle.SSTableSet; |
| import org.apache.cassandra.io.sstable.Component; |
| import org.apache.cassandra.io.sstable.Descriptor; |
| import org.apache.cassandra.io.sstable.IVerifier; |
| import org.apache.cassandra.io.sstable.KeyIterator; |
| import org.apache.cassandra.io.sstable.SSTable; |
| import org.apache.cassandra.io.sstable.format.SSTableFormat.Components; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.io.util.File; |
| import org.apache.cassandra.service.ActiveRepairService; |
| import org.apache.cassandra.service.StorageService; |
| import org.apache.cassandra.utils.OutputHandler; |
| import org.apache.cassandra.utils.Pair; |
| import org.apache.cassandra.utils.concurrent.Refs; |
| |
| public class SSTableImporter |
| { |
| private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); |
| |
| private final ColumnFamilyStore cfs; |
| |
| public SSTableImporter(ColumnFamilyStore cfs) |
| { |
| this.cfs = cfs; |
| } |
| |
| /** |
| * Imports sstables from the directories given in options.srcPaths |
| * |
| * If import fails in any of the directories, that directory is skipped and the failed directories |
| * are returned so that the user can re-upload files or remove corrupt files. |
| * |
| * If one of the directories in srcPaths is not readable/does not exist, we exit immediately to let |
| * the user change permissions or similar on the directory. |
| * |
| * @param options |
| * @return list of failed directories |
| */ |
| @VisibleForTesting |
| synchronized List<String> importNewSSTables(Options options) |
| { |
| UUID importID = UUID.randomUUID(); |
| logger.info("[{}] Loading new SSTables for {}/{}: {}", importID, cfs.keyspace.getName(), cfs.getTableName(), options); |
| |
| List<Pair<Directories.SSTableLister, String>> listers = getSSTableListers(options.srcPaths); |
| |
| Set<Descriptor> currentDescriptors = new HashSet<>(); |
| for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) |
| currentDescriptors.add(sstable.descriptor); |
| List<String> failedDirectories = new ArrayList<>(); |
| |
| // verify first to avoid starting to copy sstables to the data directories and then have to abort. |
| if (options.verifySSTables || options.verifyTokens) |
| { |
| for (Pair<Directories.SSTableLister, String> listerPair : listers) |
| { |
| Directories.SSTableLister lister = listerPair.left; |
| String dir = listerPair.right; |
| for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) |
| { |
| Descriptor descriptor = entry.getKey(); |
| if (!currentDescriptors.contains(entry.getKey())) |
| { |
| try |
| { |
| abortIfDraining(); |
| verifySSTableForImport(descriptor, entry.getValue(), options.verifyTokens, options.verifySSTables, options.extendedVerify); |
| } |
| catch (Throwable t) |
| { |
| if (dir != null) |
| { |
| logger.error("[{}] Failed verifying SSTable {} in directory {}", importID, descriptor, dir, t); |
| failedDirectories.add(dir); |
| } |
| else |
| { |
| logger.error("[{}] Failed verifying SSTable {}", importID, descriptor, t); |
| throw new RuntimeException("Failed verifying SSTable " + descriptor, t); |
| } |
| break; |
| } |
| } |
| } |
| } |
| } |
| |
| Set<SSTableReader> newSSTables = new HashSet<>(); |
| for (Pair<Directories.SSTableLister, String> listerPair : listers) |
| { |
| Directories.SSTableLister lister = listerPair.left; |
| String dir = listerPair.right; |
| if (failedDirectories.contains(dir)) |
| continue; |
| |
| Set<MovedSSTable> movedSSTables = new HashSet<>(); |
| Set<SSTableReader> newSSTablesPerDirectory = new HashSet<>(); |
| for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) |
| { |
| try |
| { |
| abortIfDraining(); |
| Descriptor oldDescriptor = entry.getKey(); |
| if (currentDescriptors.contains(oldDescriptor)) |
| continue; |
| |
| File targetDir = getTargetDirectory(dir, oldDescriptor, entry.getValue()); |
| Descriptor newDescriptor = cfs.getUniqueDescriptorFor(entry.getKey(), targetDir); |
| maybeMutateMetadata(entry.getKey(), options); |
| movedSSTables.add(new MovedSSTable(newDescriptor, entry.getKey(), entry.getValue())); |
| SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue(), options.copyData); |
| newSSTablesPerDirectory.add(sstable); |
| } |
| catch (Throwable t) |
| { |
| newSSTablesPerDirectory.forEach(s -> s.selfRef().release()); |
| if (dir != null) |
| { |
| logger.error("[{}] Failed importing sstables in directory {}", importID, dir, t); |
| failedDirectories.add(dir); |
| if (options.copyData) |
| { |
| removeCopiedSSTables(movedSSTables); |
| } |
| else |
| { |
| moveSSTablesBack(movedSSTables); |
| } |
| movedSSTables.clear(); |
| newSSTablesPerDirectory.clear(); |
| break; |
| } |
| else |
| { |
| logger.error("[{}] Failed importing sstables from data directory - renamed SSTables are: {}", importID, movedSSTables, t); |
| throw new RuntimeException("Failed importing SSTables", t); |
| } |
| } |
| } |
| newSSTables.addAll(newSSTablesPerDirectory); |
| } |
| |
| if (newSSTables.isEmpty()) |
| { |
| logger.info("[{}] No new SSTables were found for {}/{}", importID, cfs.keyspace.getName(), cfs.getTableName()); |
| return failedDirectories; |
| } |
| |
| logger.info("[{}] Loading new SSTables and building secondary indexes for {}/{}: {}", importID, cfs.keyspace.getName(), cfs.getTableName(), newSSTables); |
| if (logger.isTraceEnabled()) |
| logLeveling(importID, newSSTables); |
| |
| try (Refs<SSTableReader> refs = Refs.ref(newSSTables)) |
| { |
| abortIfDraining(); |
| cfs.getTracker().addSSTables(newSSTables); |
| for (SSTableReader reader : newSSTables) |
| { |
| if (options.invalidateCaches && cfs.isRowCacheEnabled()) |
| invalidateCachesForSSTable(reader); |
| } |
| } |
| catch (Throwable t) |
| { |
| logger.error("[{}] Failed adding SSTables", importID, t); |
| throw new RuntimeException("Failed adding SSTables", t); |
| } |
| |
| logger.info("[{}] Done loading load new SSTables for {}/{}", importID, cfs.keyspace.getName(), cfs.getTableName()); |
| return failedDirectories; |
| } |
| |
| /** |
| * Check the state of this node and throws an {@link InterruptedException} if it is currently draining |
| * |
| * @throws InterruptedException if the node is draining |
| */ |
| private static void abortIfDraining() throws InterruptedException |
| { |
| if (StorageService.instance.isDraining()) |
| throw new InterruptedException("SSTables import has been aborted"); |
| } |
| |
| private void logLeveling(UUID importID, Set<SSTableReader> newSSTables) |
| { |
| StringBuilder sb = new StringBuilder(); |
| for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) |
| sb.append(formatMetadata(sstable)); |
| logger.debug("[{}] Current sstables: {}", importID, sb); |
| sb = new StringBuilder(); |
| for (SSTableReader sstable : newSSTables) |
| sb.append(formatMetadata(sstable)); |
| logger.debug("[{}] New sstables: {}", importID, sb); |
| } |
| |
| private static String formatMetadata(SSTableReader sstable) |
| { |
| return String.format("{[%s, %s], %d, %s, %d}", |
| sstable.getFirst().getToken(), |
| sstable.getLast().getToken(), |
| sstable.getSSTableLevel(), |
| sstable.isRepaired(), |
| sstable.onDiskLength()); |
| } |
| |
| /** |
| * Opens the sstablereader described by descriptor and figures out the correct directory for it based |
| * on the first token |
| * |
| * srcPath == null means that the sstable is in a data directory and we can use that directly. |
| * |
| * If we fail figuring out the directory we will pick the one with the most available disk space. |
| */ |
| private File getTargetDirectory(String srcPath, Descriptor descriptor, Set<Component> components) |
| { |
| if (srcPath == null) |
| return descriptor.directory; |
| |
| File targetDirectory = null; |
| SSTableReader sstable = null; |
| try |
| { |
| sstable = SSTableReader.open(cfs, descriptor, components, cfs.metadata); |
| targetDirectory = cfs.getDirectories().getLocationForDisk(cfs.diskBoundaryManager.getDiskBoundaries(cfs).getCorrectDiskForSSTable(sstable)); |
| } |
| finally |
| { |
| if (sstable != null) |
| sstable.selfRef().release(); |
| } |
| return targetDirectory == null ? cfs.getDirectories().getWriteableLocationToLoadFile(descriptor.baseFile()) : targetDirectory; |
| } |
| |
| /** |
| * Create SSTableListers based on srcPaths |
| * |
| * If srcPaths is empty, we create a lister that lists sstables in the data directories (deprecated use) |
| */ |
| private List<Pair<Directories.SSTableLister, String>> getSSTableListers(Set<String> srcPaths) |
| { |
| List<Pair<Directories.SSTableLister, String>> listers = new ArrayList<>(); |
| |
| if (!srcPaths.isEmpty()) |
| { |
| for (String path : srcPaths) |
| { |
| File dir = new File(path); |
| if (!dir.exists()) |
| { |
| throw new RuntimeException(String.format("Directory %s does not exist", path)); |
| } |
| if (!Directories.verifyFullPermissions(dir, path)) |
| { |
| throw new RuntimeException("Insufficient permissions on directory " + path); |
| } |
| listers.add(Pair.create(cfs.getDirectories().sstableLister(dir, Directories.OnTxnErr.IGNORE).skipTemporary(true), path)); |
| } |
| } |
| else |
| { |
| listers.add(Pair.create(cfs.getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true), null)); |
| } |
| |
| return listers; |
| } |
| |
| private static class MovedSSTable |
| { |
| private final Descriptor newDescriptor; |
| private final Descriptor oldDescriptor; |
| private final Set<Component> components; |
| |
| private MovedSSTable(Descriptor newDescriptor, Descriptor oldDescriptor, Set<Component> components) |
| { |
| this.newDescriptor = newDescriptor; |
| this.oldDescriptor = oldDescriptor; |
| this.components = components; |
| } |
| |
| public String toString() |
| { |
| return String.format("%s moved to %s with components %s", oldDescriptor, newDescriptor, components); |
| } |
| } |
| |
| /** |
| * If we fail when opening the sstable (if for example the user passes in --no-verify and there are corrupt sstables) |
| * we might have started copying sstables to the data directory, these need to be moved back to the original name/directory |
| */ |
| private void moveSSTablesBack(Set<MovedSSTable> movedSSTables) |
| { |
| for (MovedSSTable movedSSTable : movedSSTables) |
| { |
| if (movedSSTable.newDescriptor.fileFor(Components.DATA).exists()) |
| { |
| logger.debug("Moving sstable {} back to {}", movedSSTable.newDescriptor.fileFor(Components.DATA) |
| , movedSSTable.oldDescriptor.fileFor(Components.DATA)); |
| SSTable.rename(movedSSTable.newDescriptor, movedSSTable.oldDescriptor, movedSSTable.components); |
| } |
| } |
| } |
| |
| /** |
| * Similarly for moving case, we need to delete all SSTables which were copied already but the |
| * copying as a whole has failed so we do not leave any traces behind such failed import. |
| * |
| * @param movedSSTables tables we have moved already (by copying) which need to be removed |
| */ |
| private void removeCopiedSSTables(Set<MovedSSTable> movedSSTables) |
| { |
| logger.debug("Removing copied SSTables which were left in data directories after failed SSTable import."); |
| for (MovedSSTable movedSSTable : movedSSTables) |
| { |
| // no logging here as for moveSSTablesBack case above as logging is done in delete method |
| movedSSTable.newDescriptor.getFormat().delete(movedSSTable.newDescriptor); |
| } |
| } |
| |
| /** |
| * Iterates over all keys in the sstable index and invalidates the row cache |
| */ |
| @VisibleForTesting |
| void invalidateCachesForSSTable(SSTableReader reader) |
| { |
| try (KeyIterator iter = reader.keyIterator()) |
| { |
| while (iter.hasNext()) |
| { |
| DecoratedKey decoratedKey = iter.next(); |
| cfs.invalidateCachedPartition(decoratedKey); |
| } |
| } |
| catch (IOException ex) |
| { |
| throw new RuntimeException("Failed to import sstable " + reader.getFilename(), ex); |
| } |
| } |
| |
| /** |
| * Verify an sstable for import, throws exception if there is a failure verifying. |
| * |
| * @param verifyTokens to verify that the tokens are owned by the current node |
| * @param verifySSTables to verify the sstables given. If this is false a "quick" verification will be run, just deserializing metadata |
| * @param extendedVerify to validate the values in the sstables |
| */ |
| private void verifySSTableForImport(Descriptor descriptor, Set<Component> components, boolean verifyTokens, boolean verifySSTables, boolean extendedVerify) |
| { |
| SSTableReader reader = null; |
| try |
| { |
| reader = SSTableReader.open(cfs, descriptor, components, cfs.metadata); |
| IVerifier.Options verifierOptions = IVerifier.options() |
| .extendedVerification(extendedVerify) |
| .checkOwnsTokens(verifyTokens) |
| .quick(!verifySSTables) |
| .invokeDiskFailurePolicy(false) |
| .mutateRepairStatus(false).build(); |
| |
| try (IVerifier verifier = reader.getVerifier(cfs, new OutputHandler.LogOutput(), false, verifierOptions)) |
| { |
| verifier.verify(); |
| } |
| } |
| catch (Throwable t) |
| { |
| throw new RuntimeException("Can't import sstable " + descriptor, t); |
| } |
| finally |
| { |
| if (reader != null) |
| reader.selfRef().release(); |
| } |
| } |
| |
| /** |
| * Depending on the options passed in, this might reset level on the sstable to 0 and/or remove the repair information |
| * from the sstable |
| */ |
| private void maybeMutateMetadata(Descriptor descriptor, Options options) throws IOException |
| { |
| if (descriptor.fileFor(Components.STATS).exists()) |
| { |
| if (options.resetLevel) |
| { |
| descriptor.getMetadataSerializer().mutateLevel(descriptor, 0); |
| } |
| if (options.clearRepaired) |
| { |
| descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, |
| null, |
| false); |
| } |
| } |
| } |
| |
| public static class Options |
| { |
| private final Set<String> srcPaths; |
| private final boolean resetLevel; |
| private final boolean clearRepaired; |
| private final boolean verifySSTables; |
| private final boolean verifyTokens; |
| private final boolean invalidateCaches; |
| private final boolean extendedVerify; |
| private final boolean copyData; |
| |
| public Options(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify, boolean copyData) |
| { |
| this.srcPaths = srcPaths; |
| this.resetLevel = resetLevel; |
| this.clearRepaired = clearRepaired; |
| this.verifySSTables = verifySSTables; |
| this.verifyTokens = verifyTokens; |
| this.invalidateCaches = invalidateCaches; |
| this.extendedVerify = extendedVerify; |
| this.copyData = copyData; |
| } |
| |
| public static Builder options(String srcDir) |
| { |
| return new Builder(Collections.singleton(srcDir)); |
| } |
| |
| public static Builder options(Set<String> srcDirs) |
| { |
| return new Builder(srcDirs); |
| } |
| |
| public static Builder options() |
| { |
| return options(Collections.emptySet()); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "Options{" + |
| "srcPaths='" + srcPaths + '\'' + |
| ", resetLevel=" + resetLevel + |
| ", clearRepaired=" + clearRepaired + |
| ", verifySSTables=" + verifySSTables + |
| ", verifyTokens=" + verifyTokens + |
| ", invalidateCaches=" + invalidateCaches + |
| ", extendedVerify=" + extendedVerify + |
| ", copyData= " + copyData + |
| '}'; |
| } |
| |
| static class Builder |
| { |
| private final Set<String> srcPaths; |
| private boolean resetLevel = false; |
| private boolean clearRepaired = false; |
| private boolean verifySSTables = false; |
| private boolean verifyTokens = false; |
| private boolean invalidateCaches = false; |
| private boolean extendedVerify = false; |
| private boolean copyData = false; |
| |
| private Builder(Set<String> srcPath) |
| { |
| assert srcPath != null; |
| this.srcPaths = srcPath; |
| } |
| |
| public Builder resetLevel(boolean value) |
| { |
| resetLevel = value; |
| return this; |
| } |
| |
| public Builder clearRepaired(boolean value) |
| { |
| clearRepaired = value; |
| return this; |
| } |
| |
| public Builder verifySSTables(boolean value) |
| { |
| verifySSTables = value; |
| return this; |
| } |
| |
| public Builder verifyTokens(boolean value) |
| { |
| verifyTokens = value; |
| return this; |
| } |
| |
| public Builder invalidateCaches(boolean value) |
| { |
| invalidateCaches = value; |
| return this; |
| } |
| |
| public Builder extendedVerify(boolean value) |
| { |
| extendedVerify = value; |
| return this; |
| } |
| |
| public Builder copyData(boolean value) |
| { |
| copyData = value; |
| return this; |
| } |
| |
| public Options build() |
| { |
| return new Options(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify, copyData); |
| } |
| } |
| } |
| |
| } |