| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.db; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.db.compaction.Verifier; |
| import org.apache.cassandra.db.lifecycle.SSTableSet; |
| import org.apache.cassandra.io.sstable.Component; |
| import org.apache.cassandra.io.sstable.Descriptor; |
| import org.apache.cassandra.io.sstable.KeyIterator; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.io.sstable.format.SSTableWriter; |
| import org.apache.cassandra.service.ActiveRepairService; |
| import org.apache.cassandra.utils.Pair; |
| import org.apache.cassandra.utils.concurrent.Refs; |
| |
| public class SSTableImporter |
| { |
| private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); |
| |
| private final ColumnFamilyStore cfs; |
| |
| public SSTableImporter(ColumnFamilyStore cfs) |
| { |
| this.cfs = cfs; |
| } |
| |
| /** |
| * Imports sstables from the directories given in options.srcPaths |
| * |
| * If import fails in any of the directories, that directory is skipped and the failed directories |
| * are returned so that the user can re-upload files or remove corrupt files. |
| * |
| * If one of the directories in srcPaths is not readable/does not exist, we exit immediately to let |
| * the user change permissions or similar on the directory. |
| * |
| * @param options |
| * @return list of failed directories |
| */ |
| @VisibleForTesting |
| synchronized List<String> importNewSSTables(Options options) |
| { |
| logger.info("Loading new SSTables for {}/{}: {}", |
| cfs.keyspace.getName(), cfs.getTableName(), options); |
| |
| List<Pair<Directories.SSTableLister, String>> listers = getSSTableListers(options.srcPaths); |
| |
| Set<Descriptor> currentDescriptors = new HashSet<>(); |
| for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) |
| currentDescriptors.add(sstable.descriptor); |
| List<String> failedDirectories = new ArrayList<>(); |
| |
| // verify first to avoid starting to copy sstables to the data directories and then have to abort. |
| if (options.verifySSTables || options.verifyTokens) |
| { |
| for (Pair<Directories.SSTableLister, String> listerPair : listers) |
| { |
| Directories.SSTableLister lister = listerPair.left; |
| String dir = listerPair.right; |
| for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) |
| { |
| Descriptor descriptor = entry.getKey(); |
| if (!currentDescriptors.contains(entry.getKey())) |
| { |
| try |
| { |
| verifySSTableForImport(descriptor, entry.getValue(), options.verifyTokens, options.verifySSTables, options.extendedVerify); |
| } |
| catch (Throwable t) |
| { |
| if (dir != null) |
| { |
| logger.error("Failed verifying sstable {} in directory {}", descriptor, dir, t); |
| failedDirectories.add(dir); |
| } |
| else |
| { |
| logger.error("Failed verifying sstable {}", descriptor, t); |
| throw new RuntimeException("Failed verifying sstable "+descriptor, t); |
| } |
| break; |
| } |
| } |
| } |
| } |
| } |
| |
| Set<SSTableReader> newSSTables = new HashSet<>(); |
| for (Pair<Directories.SSTableLister, String> listerPair : listers) |
| { |
| Directories.SSTableLister lister = listerPair.left; |
| String dir = listerPair.right; |
| if (failedDirectories.contains(dir)) |
| continue; |
| |
| Set<MovedSSTable> movedSSTables = new HashSet<>(); |
| Set<SSTableReader> newSSTablesPerDirectory = new HashSet<>(); |
| for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) |
| { |
| try |
| { |
| Descriptor oldDescriptor = entry.getKey(); |
| if (currentDescriptors.contains(oldDescriptor)) |
| continue; |
| |
| File targetDir = getTargetDirectory(dir, oldDescriptor, entry.getValue()); |
| Descriptor newDescriptor = cfs.getUniqueDescriptorFor(entry.getKey(), targetDir); |
| maybeMutateMetadata(entry.getKey(), options); |
| movedSSTables.add(new MovedSSTable(newDescriptor, entry.getKey(), entry.getValue())); |
| SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue(), options.copyData); |
| newSSTablesPerDirectory.add(sstable); |
| } |
| catch (Throwable t) |
| { |
| newSSTablesPerDirectory.forEach(s -> s.selfRef().release()); |
| if (dir != null) |
| { |
| logger.error("Failed importing sstables in directory {}", dir, t); |
| failedDirectories.add(dir); |
| if (options.copyData) |
| { |
| removeCopiedSSTables(movedSSTables); |
| } |
| else |
| { |
| moveSSTablesBack(movedSSTables); |
| } |
| movedSSTables.clear(); |
| newSSTablesPerDirectory.clear(); |
| break; |
| } |
| else |
| { |
| logger.error("Failed importing sstables from data directory - renamed sstables are: {}", movedSSTables); |
| throw new RuntimeException("Failed importing sstables", t); |
| } |
| } |
| } |
| newSSTables.addAll(newSSTablesPerDirectory); |
| } |
| |
| if (newSSTables.isEmpty()) |
| { |
| logger.info("No new SSTables were found for {}/{}", cfs.keyspace.getName(), cfs.getTableName()); |
| return failedDirectories; |
| } |
| |
| logger.info("Loading new SSTables and building secondary indexes for {}/{}: {}", cfs.keyspace.getName(), cfs.getTableName(), newSSTables); |
| |
| try (Refs<SSTableReader> refs = Refs.ref(newSSTables)) |
| { |
| cfs.getTracker().addSSTables(newSSTables); |
| for (SSTableReader reader : newSSTables) |
| { |
| if (options.invalidateCaches && cfs.isRowCacheEnabled()) |
| invalidateCachesForSSTable(reader.descriptor); |
| } |
| |
| } |
| |
| logger.info("Done loading load new SSTables for {}/{}", cfs.keyspace.getName(), cfs.getTableName()); |
| return failedDirectories; |
| } |
| |
| /** |
| * Opens the sstablereader described by descriptor and figures out the correct directory for it based |
| * on the first token |
| * |
| * srcPath == null means that the sstable is in a data directory and we can use that directly. |
| * |
| * If we fail figuring out the directory we will pick the one with the most available disk space. |
| */ |
| private File getTargetDirectory(String srcPath, Descriptor descriptor, Set<Component> components) |
| { |
| if (srcPath == null) |
| return descriptor.directory; |
| |
| File targetDirectory = null; |
| SSTableReader sstable = null; |
| try |
| { |
| sstable = SSTableReader.open(descriptor, components, cfs.metadata); |
| targetDirectory = cfs.getDirectories().getLocationForDisk(cfs.diskBoundaryManager.getDiskBoundaries(cfs).getCorrectDiskForSSTable(sstable)); |
| } |
| finally |
| { |
| if (sstable != null) |
| sstable.selfRef().release(); |
| } |
| return targetDirectory == null ? cfs.getDirectories().getWriteableLocationToLoadFile(new File(descriptor.baseFilename())) : targetDirectory; |
| } |
| |
| /** |
| * Create SSTableListers based on srcPaths |
| * |
| * If srcPaths is empty, we create a lister that lists sstables in the data directories (deprecated use) |
| */ |
| private List<Pair<Directories.SSTableLister, String>> getSSTableListers(Set<String> srcPaths) |
| { |
| List<Pair<Directories.SSTableLister, String>> listers = new ArrayList<>(); |
| |
| if (!srcPaths.isEmpty()) |
| { |
| for (String path : srcPaths) |
| { |
| File dir = new File(path); |
| if (!dir.exists()) |
| { |
| throw new RuntimeException(String.format("Directory %s does not exist", path)); |
| } |
| if (!Directories.verifyFullPermissions(dir, path)) |
| { |
| throw new RuntimeException("Insufficient permissions on directory " + path); |
| } |
| listers.add(Pair.create(cfs.getDirectories().sstableLister(dir, Directories.OnTxnErr.IGNORE).skipTemporary(true), path)); |
| } |
| } |
| else |
| { |
| listers.add(Pair.create(cfs.getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true), null)); |
| } |
| |
| return listers; |
| } |
| |
| private static class MovedSSTable |
| { |
| private final Descriptor newDescriptor; |
| private final Descriptor oldDescriptor; |
| private final Set<Component> components; |
| |
| private MovedSSTable(Descriptor newDescriptor, Descriptor oldDescriptor, Set<Component> components) |
| { |
| this.newDescriptor = newDescriptor; |
| this.oldDescriptor = oldDescriptor; |
| this.components = components; |
| } |
| |
| public String toString() |
| { |
| return String.format("%s moved to %s with components %s", oldDescriptor, newDescriptor, components); |
| } |
| } |
| |
| /** |
| * If we fail when opening the sstable (if for example the user passes in --no-verify and there are corrupt sstables) |
| * we might have started copying sstables to the data directory, these need to be moved back to the original name/directory |
| */ |
| private void moveSSTablesBack(Set<MovedSSTable> movedSSTables) |
| { |
| for (MovedSSTable movedSSTable : movedSSTables) |
| { |
| if (new File(movedSSTable.newDescriptor.filenameFor(Component.DATA)).exists()) |
| { |
| logger.debug("Moving sstable {} back to {}", movedSSTable.newDescriptor.filenameFor(Component.DATA) |
| , movedSSTable.oldDescriptor.filenameFor(Component.DATA)); |
| SSTableWriter.rename(movedSSTable.newDescriptor, movedSSTable.oldDescriptor, movedSSTable.components); |
| } |
| } |
| } |
| |
| /** |
| * Similarly for moving case, we need to delete all SSTables which were copied already but the |
| * copying as a whole has failed so we do not leave any traces behind such failed import. |
| * |
| * @param movedSSTables tables we have moved already (by copying) which need to be removed |
| */ |
| private void removeCopiedSSTables(Set<MovedSSTable> movedSSTables) |
| { |
| logger.debug("Removing copied SSTables which were left in data directories after failed SSTable import."); |
| for (MovedSSTable movedSSTable : movedSSTables) |
| { |
| if (new File(movedSSTable.newDescriptor.filenameFor(Component.DATA)).exists()) |
| { |
| // no logging here as for moveSSTablesBack case above as logging is done in delete method |
| SSTableWriter.delete(movedSSTable.newDescriptor, movedSSTable.components); |
| } |
| } |
| } |
| |
| /** |
| * Iterates over all keys in the sstable index and invalidates the row cache |
| */ |
| @VisibleForTesting |
| void invalidateCachesForSSTable(Descriptor desc) |
| { |
| try (KeyIterator iter = new KeyIterator(desc, cfs.metadata())) |
| { |
| while (iter.hasNext()) |
| { |
| DecoratedKey decoratedKey = iter.next(); |
| cfs.invalidateCachedPartition(decoratedKey); |
| } |
| } |
| } |
| |
| /** |
| * Verify an sstable for import, throws exception if there is a failure verifying. |
| * |
| * @param verifyTokens to verify that the tokens are owned by the current node |
| * @param verifySSTables to verify the sstables given. If this is false a "quick" verification will be run, just deserializing metadata |
| * @param extendedVerify to validate the values in the sstables |
| */ |
| private void verifySSTableForImport(Descriptor descriptor, Set<Component> components, boolean verifyTokens, boolean verifySSTables, boolean extendedVerify) |
| { |
| SSTableReader reader = null; |
| try |
| { |
| reader = SSTableReader.open(descriptor, components, cfs.metadata); |
| Verifier.Options verifierOptions = Verifier.options() |
| .extendedVerification(extendedVerify) |
| .checkOwnsTokens(verifyTokens) |
| .quick(!verifySSTables) |
| .invokeDiskFailurePolicy(false) |
| .mutateRepairStatus(false).build(); |
| try (Verifier verifier = new Verifier(cfs, reader, false, verifierOptions)) |
| { |
| verifier.verify(); |
| } |
| } |
| catch (Throwable t) |
| { |
| throw new RuntimeException("Can't import sstable " + descriptor, t); |
| } |
| finally |
| { |
| if (reader != null) |
| reader.selfRef().release(); |
| } |
| } |
| |
| /** |
| * Depending on the options passed in, this might reset level on the sstable to 0 and/or remove the repair information |
| * from the sstable |
| */ |
| private void maybeMutateMetadata(Descriptor descriptor, Options options) throws IOException |
| { |
| if (new File(descriptor.filenameFor(Component.STATS)).exists()) |
| { |
| if (options.resetLevel) |
| { |
| descriptor.getMetadataSerializer().mutateLevel(descriptor, 0); |
| } |
| if (options.clearRepaired) |
| { |
| descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, |
| null, |
| false); |
| } |
| } |
| } |
| |
| public static class Options |
| { |
| private final Set<String> srcPaths; |
| private final boolean resetLevel; |
| private final boolean clearRepaired; |
| private final boolean verifySSTables; |
| private final boolean verifyTokens; |
| private final boolean invalidateCaches; |
| private final boolean extendedVerify; |
| private final boolean copyData; |
| |
| public Options(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify, boolean copyData) |
| { |
| this.srcPaths = srcPaths; |
| this.resetLevel = resetLevel; |
| this.clearRepaired = clearRepaired; |
| this.verifySSTables = verifySSTables; |
| this.verifyTokens = verifyTokens; |
| this.invalidateCaches = invalidateCaches; |
| this.extendedVerify = extendedVerify; |
| this.copyData = copyData; |
| } |
| |
| public static Builder options(String srcDir) |
| { |
| return new Builder(Collections.singleton(srcDir)); |
| } |
| |
| public static Builder options(Set<String> srcDirs) |
| { |
| return new Builder(srcDirs); |
| } |
| |
| public static Builder options() |
| { |
| return options(Collections.emptySet()); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "Options{" + |
| "srcPaths='" + srcPaths + '\'' + |
| ", resetLevel=" + resetLevel + |
| ", clearRepaired=" + clearRepaired + |
| ", verifySSTables=" + verifySSTables + |
| ", verifyTokens=" + verifyTokens + |
| ", invalidateCaches=" + invalidateCaches + |
| ", extendedVerify=" + extendedVerify + |
| ", copyData= " + copyData + |
| '}'; |
| } |
| |
| static class Builder |
| { |
| private final Set<String> srcPaths; |
| private boolean resetLevel = false; |
| private boolean clearRepaired = false; |
| private boolean verifySSTables = false; |
| private boolean verifyTokens = false; |
| private boolean invalidateCaches = false; |
| private boolean extendedVerify = false; |
| private boolean copyData = false; |
| |
| private Builder(Set<String> srcPath) |
| { |
| assert srcPath != null; |
| this.srcPaths = srcPath; |
| } |
| |
| public Builder resetLevel(boolean value) |
| { |
| resetLevel = value; |
| return this; |
| } |
| |
| public Builder clearRepaired(boolean value) |
| { |
| clearRepaired = value; |
| return this; |
| } |
| |
| public Builder verifySSTables(boolean value) |
| { |
| verifySSTables = value; |
| return this; |
| } |
| |
| public Builder verifyTokens(boolean value) |
| { |
| verifyTokens = value; |
| return this; |
| } |
| |
| public Builder invalidateCaches(boolean value) |
| { |
| invalidateCaches = value; |
| return this; |
| } |
| |
| public Builder extendedVerify(boolean value) |
| { |
| extendedVerify = value; |
| return this; |
| } |
| |
| public Builder copyData(boolean value) |
| { |
| copyData = value; |
| return this; |
| } |
| |
| public Options build() |
| { |
| return new Options(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify, copyData); |
| } |
| } |
| } |
| |
| } |