| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.accumulo.gc; |
| |
| import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.net.UnknownHostException; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.accumulo.core.Constants; |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.AccumuloSecurityException; |
| import org.apache.accumulo.core.client.BatchWriter; |
| import org.apache.accumulo.core.client.BatchWriterConfig; |
| import org.apache.accumulo.core.client.Connector; |
| import org.apache.accumulo.core.client.Instance; |
| import org.apache.accumulo.core.client.IsolatedScanner; |
| import org.apache.accumulo.core.client.MutationsRejectedException; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.client.impl.Tables; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.conf.SiteConfiguration; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Mutation; |
| import org.apache.accumulo.core.data.PartialKey; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface; |
| import org.apache.accumulo.core.gc.thrift.GCMonitorService.Processor; |
| import org.apache.accumulo.core.gc.thrift.GCStatus; |
| import org.apache.accumulo.core.gc.thrift.GcCycleStats; |
| import org.apache.accumulo.core.master.state.tables.TableState; |
| import org.apache.accumulo.core.metadata.MetadataTable; |
| import org.apache.accumulo.core.metadata.RootTable; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; |
| import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; |
| import org.apache.accumulo.core.replication.ReplicationTable; |
| import org.apache.accumulo.core.replication.ReplicationTableOfflineException; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.core.security.thrift.TCredentials; |
| import org.apache.accumulo.core.trace.DistributedTrace; |
| import org.apache.accumulo.core.trace.ProbabilitySampler; |
| import org.apache.accumulo.core.trace.Span; |
| import org.apache.accumulo.core.trace.Trace; |
| import org.apache.accumulo.core.trace.thrift.TInfo; |
| import org.apache.accumulo.core.util.NamingThreadFactory; |
| import org.apache.accumulo.core.util.Pair; |
| import org.apache.accumulo.core.util.ServerServices; |
| import org.apache.accumulo.core.util.ServerServices.Service; |
| import org.apache.accumulo.core.volume.Volume; |
| import org.apache.accumulo.core.zookeeper.ZooUtil; |
| import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; |
| import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; |
| import org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences; |
| import org.apache.accumulo.server.Accumulo; |
| import org.apache.accumulo.server.AccumuloServerContext; |
| import org.apache.accumulo.server.ServerConstants; |
| import org.apache.accumulo.server.ServerOpts; |
| import org.apache.accumulo.server.client.HdfsZooInstance; |
| import org.apache.accumulo.server.conf.ServerConfigurationFactory; |
| import org.apache.accumulo.server.fs.VolumeManager; |
| import org.apache.accumulo.server.fs.VolumeManager.FileType; |
| import org.apache.accumulo.server.fs.VolumeManagerImpl; |
| import org.apache.accumulo.server.fs.VolumeUtil; |
| import org.apache.accumulo.server.replication.proto.Replication.Status; |
| import org.apache.accumulo.server.rpc.RpcWrapper; |
| import org.apache.accumulo.server.rpc.ServerAddress; |
| import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; |
| import org.apache.accumulo.server.rpc.TServerUtils; |
| import org.apache.accumulo.server.rpc.ThriftServerType; |
| import org.apache.accumulo.server.security.SecurityUtil; |
| import org.apache.accumulo.server.tables.TableManager; |
| import org.apache.accumulo.server.util.Halt; |
| import org.apache.accumulo.server.util.TabletIterator; |
| import org.apache.accumulo.server.zookeeper.ZooLock; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.apache.zookeeper.KeeperException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.beust.jcommander.Parameter; |
| import com.google.common.base.Function; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Maps; |
| import com.google.common.net.HostAndPort; |
| import com.google.protobuf.InvalidProtocolBufferException; |
| |
| public class SimpleGarbageCollector extends AccumuloServerContext implements Iface { |
| private static final Text EMPTY_TEXT = new Text(); |
| |
| /** |
| * Options for the garbage collector. |
| */ |
| static class Opts extends ServerOpts { |
| @Parameter(names = {"-v", "--verbose"}, description = "extra information will get printed to stdout also") |
| boolean verbose = false; |
| @Parameter(names = {"-s", "--safemode"}, description = "safe mode will not delete files") |
| boolean safeMode = false; |
| } |
| |
| /** |
| * A fraction representing how much of the JVM's available memory should be used for gathering candidates. |
| */ |
| static final float CANDIDATE_MEMORY_PERCENTAGE = 0.50f; |
| |
| private static final Logger log = LoggerFactory.getLogger(SimpleGarbageCollector.class); |
| |
| private VolumeManager fs; |
| private Opts opts = new Opts(); |
| private ZooLock lock; |
| |
| private GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats()); |
| |
| public static void main(String[] args) throws UnknownHostException, IOException { |
| SecurityUtil.serverLogin(SiteConfiguration.getInstance()); |
| final String app = "gc"; |
| Accumulo.setupLogging(app); |
| Instance instance = HdfsZooInstance.getInstance(); |
| ServerConfigurationFactory conf = new ServerConfigurationFactory(instance); |
| log.info("Version " + Constants.VERSION); |
| log.info("Instance " + instance.getInstanceID()); |
| final VolumeManager fs = VolumeManagerImpl.get(); |
| Accumulo.init(fs, conf, app); |
| Opts opts = new Opts(); |
| opts.parseArgs(app, args); |
| SimpleGarbageCollector gc = new SimpleGarbageCollector(opts, fs, conf); |
| |
| DistributedTrace.enable(opts.getAddress(), app, conf.getConfiguration()); |
| try { |
| gc.run(); |
| } finally { |
| DistributedTrace.disable(); |
| } |
| } |
| |
| /** |
| * Creates a new garbage collector. |
| * |
| * @param opts |
| * options |
| */ |
| public SimpleGarbageCollector(Opts opts, VolumeManager fs, ServerConfigurationFactory confFactory) { |
| super(confFactory); |
| this.opts = opts; |
| this.fs = fs; |
| |
| long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); |
| log.info("start delay: " + getStartDelay() + " milliseconds"); |
| log.info("time delay: " + gcDelay + " milliseconds"); |
| log.info("safemode: " + opts.safeMode); |
| log.info("verbose: " + opts.verbose); |
| log.info("memory threshold: " + CANDIDATE_MEMORY_PERCENTAGE + " of " + Runtime.getRuntime().maxMemory() + " bytes"); |
| log.info("delete threads: " + getNumDeleteThreads()); |
| } |
| |
| /** |
| * Gets the delay before the first collection. |
| * |
| * @return start delay, in milliseconds |
| */ |
| long getStartDelay() { |
| return getConfiguration().getTimeInMillis(Property.GC_CYCLE_START); |
| } |
| |
| /** |
| * Gets the volume manager used by this GC. |
| * |
| * @return volume manager |
| */ |
| VolumeManager getVolumeManager() { |
| return fs; |
| } |
| |
| /** |
| * Checks if the volume manager should move files to the trash rather than delete them. |
| * |
| * @return true if trash is used |
| */ |
| boolean isUsingTrash() { |
| return !getConfiguration().getBoolean(Property.GC_TRASH_IGNORE); |
| } |
| |
| /** |
| * Gets the options for this garbage collector. |
| */ |
| Opts getOpts() { |
| return opts; |
| } |
| |
| /** |
| * Gets the number of threads used for deleting files. |
| * |
| * @return number of delete threads |
| */ |
| int getNumDeleteThreads() { |
| return getConfiguration().getCount(Property.GC_DELETE_THREADS); |
| } |
| |
| /** |
| * Should files be archived (as opposed to preserved in trash) |
| * |
| * @return True if files should be archived, false otherwise |
| */ |
| boolean shouldArchiveFiles() { |
| return getConfiguration().getBoolean(Property.GC_FILE_ARCHIVE); |
| } |
| |
| private class GCEnv implements GarbageCollectionEnvironment { |
| |
| private String tableName; |
| |
| GCEnv(String tableName) { |
| this.tableName = tableName; |
| } |
| |
| @Override |
| public boolean getCandidates(String continuePoint, List<String> result) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { |
| // want to ensure GC makes progress... if the 1st N deletes are stable and we keep processing them, |
| // then will never inspect deletes after N |
| Range range = MetadataSchema.DeletesSection.getRange(); |
| if (continuePoint != null && !continuePoint.isEmpty()) { |
| String continueRow = MetadataSchema.DeletesSection.getRowPrefix() + continuePoint; |
| range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive()); |
| } |
| |
| Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY); |
| scanner.setRange(range); |
| result.clear(); |
| // find candidates for deletion; chop off the prefix |
| for (Entry<Key,Value> entry : scanner) { |
| String cand = entry.getKey().getRow().toString().substring(MetadataSchema.DeletesSection.getRowPrefix().length()); |
| result.add(cand); |
| if (almostOutOfMemory(Runtime.getRuntime())) { |
| log.info("List of delete candidates has exceeded the memory threshold. Attempting to delete what has been gathered so far."); |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| @Override |
| public Iterator<String> getBlipIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException { |
| @SuppressWarnings("resource") |
| IsolatedScanner scanner = new IsolatedScanner(getConnector().createScanner(tableName, Authorizations.EMPTY)); |
| |
| scanner.setRange(MetadataSchema.BlipSection.getRange()); |
| |
| return Iterators.transform(scanner.iterator(), new Function<Entry<Key,Value>,String>() { |
| @Override |
| public String apply(Entry<Key,Value> entry) { |
| return entry.getKey().getRow().toString().substring(MetadataSchema.BlipSection.getRowPrefix().length()); |
| } |
| }); |
| } |
| |
| @Override |
| public Iterator<Entry<Key,Value>> getReferenceIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException { |
| IsolatedScanner scanner = new IsolatedScanner(getConnector().createScanner(tableName, Authorizations.EMPTY)); |
| scanner.fetchColumnFamily(DataFileColumnFamily.NAME); |
| scanner.fetchColumnFamily(ScanFileColumnFamily.NAME); |
| TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); |
| TabletIterator tabletIterator = new TabletIterator(scanner, MetadataSchema.TabletsSection.getRange(), false, true); |
| |
| return Iterators.concat(Iterators.transform(tabletIterator, new Function<Map<Key,Value>,Iterator<Entry<Key,Value>>>() { |
| @Override |
| public Iterator<Entry<Key,Value>> apply(Map<Key,Value> input) { |
| return input.entrySet().iterator(); |
| } |
| })); |
| } |
| |
| @Override |
| public Set<String> getTableIDs() { |
| return Tables.getIdToNameMap(getInstance()).keySet(); |
| } |
| |
| @Override |
| public void delete(SortedMap<String,String> confirmedDeletes) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| |
| if (opts.safeMode) { |
| if (opts.verbose) |
| System.out.println("SAFEMODE: There are " + confirmedDeletes.size() + " data file candidates marked for deletion.%n" |
| + " Examine the log files to identify them.%n"); |
| log.info("SAFEMODE: Listing all data file candidates for deletion"); |
| for (String s : confirmedDeletes.values()) |
| log.info("SAFEMODE: " + s); |
| log.info("SAFEMODE: End candidates for deletion"); |
| return; |
| } |
| |
| Connector c = getConnector(); |
| BatchWriter writer = c.createBatchWriter(tableName, new BatchWriterConfig()); |
| |
| // when deleting a dir and all files in that dir, only need to delete the dir |
| // the dir will sort right before the files... so remove the files in this case |
| // to minimize namenode ops |
| Iterator<Entry<String,String>> cdIter = confirmedDeletes.entrySet().iterator(); |
| |
| String lastDir = null; |
| while (cdIter.hasNext()) { |
| Entry<String,String> entry = cdIter.next(); |
| String relPath = entry.getKey(); |
| String absPath = fs.getFullPath(FileType.TABLE, entry.getValue()).toString(); |
| |
| if (isDir(relPath)) { |
| lastDir = absPath; |
| } else if (lastDir != null) { |
| if (absPath.startsWith(lastDir)) { |
| log.debug("Ignoring " + entry.getValue() + " because " + lastDir + " exist"); |
| try { |
| putMarkerDeleteMutation(entry.getValue(), writer); |
| } catch (MutationsRejectedException e) { |
| throw new RuntimeException(e); |
| } |
| cdIter.remove(); |
| } else { |
| lastDir = null; |
| } |
| } |
| } |
| |
| final BatchWriter finalWriter = writer; |
| |
| ExecutorService deleteThreadPool = Executors.newFixedThreadPool(getNumDeleteThreads(), new NamingThreadFactory("deleting")); |
| |
| final List<Pair<Path,Path>> replacements = ServerConstants.getVolumeReplacements(); |
| |
| for (final String delete : confirmedDeletes.values()) { |
| |
| Runnable deleteTask = new Runnable() { |
| @Override |
| public void run() { |
| boolean removeFlag; |
| |
| try { |
| Path fullPath; |
| String switchedDelete = VolumeUtil.switchVolume(delete, FileType.TABLE, replacements); |
| if (switchedDelete != null) { |
| // actually replacing the volumes in the metadata table would be tricky because the entries would be different rows. So it could not be |
| // atomically in one mutation and extreme care would need to be taken that delete entry was not lost. Instead of doing that, just deal with |
| // volume switching when something needs to be deleted. Since the rest of the code uses suffixes to compare delete entries, there is no danger |
| // of deleting something that should not be deleted. Must not change value of delete variable because thats whats stored in metadata table. |
| log.debug("Volume replaced " + delete + " -> " + switchedDelete); |
| fullPath = fs.getFullPath(FileType.TABLE, switchedDelete); |
| } else { |
| fullPath = fs.getFullPath(FileType.TABLE, delete); |
| } |
| |
| log.debug("Deleting " + fullPath); |
| |
| if (archiveOrMoveToTrash(fullPath) || fs.deleteRecursively(fullPath)) { |
| // delete succeeded, still want to delete |
| removeFlag = true; |
| synchronized (SimpleGarbageCollector.this) { |
| ++status.current.deleted; |
| } |
| } else if (fs.exists(fullPath)) { |
| // leave the entry in the metadata; we'll try again later |
| removeFlag = false; |
| synchronized (SimpleGarbageCollector.this) { |
| ++status.current.errors; |
| } |
| log.warn("File exists, but was not deleted for an unknown reason: " + fullPath); |
| } else { |
| // this failure, we still want to remove the metadata entry |
| removeFlag = true; |
| synchronized (SimpleGarbageCollector.this) { |
| ++status.current.errors; |
| } |
| String parts[] = fullPath.toString().split(Constants.ZTABLES)[1].split("/"); |
| if (parts.length > 2) { |
| String tableId = parts[1]; |
| String tabletDir = parts[2]; |
| TableManager.getInstance().updateTableStateCache(tableId); |
| TableState tableState = TableManager.getInstance().getTableState(tableId); |
| if (tableState != null && tableState != TableState.DELETING) { |
| // clone directories don't always exist |
| if (!tabletDir.startsWith(Constants.CLONE_PREFIX)) |
| log.debug("File doesn't exist: " + fullPath); |
| } |
| } else { |
| log.warn("Very strange path name: " + delete); |
| } |
| } |
| |
| // proceed to clearing out the flags for successful deletes and |
| // non-existent files |
| if (removeFlag && finalWriter != null) { |
| putMarkerDeleteMutation(delete, finalWriter); |
| } |
| } catch (Exception e) { |
| log.error("{}", e.getMessage(), e); |
| } |
| |
| } |
| |
| }; |
| |
| deleteThreadPool.execute(deleteTask); |
| } |
| |
| deleteThreadPool.shutdown(); |
| |
| try { |
| while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) {} |
| } catch (InterruptedException e1) { |
| log.error("{}", e1.getMessage(), e1); |
| } |
| |
| if (writer != null) { |
| try { |
| writer.close(); |
| } catch (MutationsRejectedException e) { |
| log.error("Problem removing entries from the metadata table: ", e); |
| } |
| } |
| } |
| |
| @Override |
| public void deleteTableDirIfEmpty(String tableID) throws IOException { |
| // if dir exist and is empty, then empty list is returned... |
| // hadoop 2.0 will throw an exception if the file does not exist |
| for (String dir : ServerConstants.getTablesDirs()) { |
| FileStatus[] tabletDirs = null; |
| try { |
| tabletDirs = fs.listStatus(new Path(dir + "/" + tableID)); |
| } catch (FileNotFoundException ex) { |
| continue; |
| } |
| |
| if (tabletDirs.length == 0) { |
| Path p = new Path(dir + "/" + tableID); |
| log.debug("Removing table dir " + p); |
| if (!archiveOrMoveToTrash(p)) |
| fs.delete(p); |
| } |
| } |
| } |
| |
| @Override |
| public void incrementCandidatesStat(long i) { |
| status.current.candidates += i; |
| } |
| |
| @Override |
| public void incrementInUseStat(long i) { |
| status.current.inUse += i; |
| } |
| |
| @Override |
| public Iterator<Entry<String,Status>> getReplicationNeededIterator() throws AccumuloException, AccumuloSecurityException { |
| Connector conn = getConnector(); |
| try { |
| Scanner s = ReplicationTable.getScanner(conn); |
| StatusSection.limit(s); |
| return Iterators.transform(s.iterator(), new Function<Entry<Key,Value>,Entry<String,Status>>() { |
| |
| @Override |
| public Entry<String,Status> apply(Entry<Key,Value> input) { |
| String file = input.getKey().getRow().toString(); |
| Status stat; |
| try { |
| stat = Status.parseFrom(input.getValue().get()); |
| } catch (InvalidProtocolBufferException e) { |
| log.warn("Could not deserialize protobuf for: " + input.getKey()); |
| stat = null; |
| } |
| return Maps.immutableEntry(file, stat); |
| } |
| |
| }); |
| } catch (ReplicationTableOfflineException e) { |
| // No elements that we need to preclude |
| return Iterators.emptyIterator(); |
| } |
| } |
| |
| } |
| |
| private void run() { |
| long tStart, tStop; |
| |
| // Sleep for an initial period, giving the master time to start up and |
| // old data files to be unused |
| log.info("Trying to acquire ZooKeeper lock for garbage collector"); |
| |
| try { |
| getZooLock(startStatsService()); |
| } catch (Exception ex) { |
| log.error("{}", ex.getMessage(), ex); |
| System.exit(1); |
| } |
| |
| try { |
| long delay = getStartDelay(); |
| log.debug("Sleeping for " + delay + " milliseconds before beginning garbage collection cycles"); |
| Thread.sleep(delay); |
| } catch (InterruptedException e) { |
| log.warn("{}", e.getMessage(), e); |
| return; |
| } |
| |
| ProbabilitySampler sampler = new ProbabilitySampler(getConfiguration().getFraction(Property.GC_TRACE_PERCENT)); |
| |
| while (true) { |
| Trace.on("gc", sampler); |
| |
| Span gcSpan = Trace.start("loop"); |
| tStart = System.currentTimeMillis(); |
| try { |
| System.gc(); // make room |
| |
| status.current.started = System.currentTimeMillis(); |
| |
| new GarbageCollectionAlgorithm().collect(new GCEnv(RootTable.NAME)); |
| new GarbageCollectionAlgorithm().collect(new GCEnv(MetadataTable.NAME)); |
| |
| log.info("Number of data file candidates for deletion: " + status.current.candidates); |
| log.info("Number of data file candidates still in use: " + status.current.inUse); |
| log.info("Number of successfully deleted data files: " + status.current.deleted); |
| log.info("Number of data files delete failures: " + status.current.errors); |
| |
| status.current.finished = System.currentTimeMillis(); |
| status.last = status.current; |
| status.current = new GcCycleStats(); |
| |
| } catch (Exception e) { |
| log.error("{}", e.getMessage(), e); |
| } |
| |
| tStop = System.currentTimeMillis(); |
| log.info(String.format("Collect cycle took %.2f seconds", ((tStop - tStart) / 1000.0))); |
| |
| // We want to prune references to fully-replicated WALs from the replication table which are no longer referenced in the metadata table |
| // before running GarbageCollectWriteAheadLogs to ensure we delete as many files as possible. |
| Span replSpan = Trace.start("replicationClose"); |
| try { |
| CloseWriteAheadLogReferences closeWals = new CloseWriteAheadLogReferences(this); |
| closeWals.run(); |
| } catch (Exception e) { |
| log.error("Error trying to close write-ahead logs for replication table", e); |
| } finally { |
| replSpan.stop(); |
| } |
| |
| Span waLogs = Trace.start("walogs"); |
| try { |
| GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash()); |
| log.info("Beginning garbage collection of write-ahead logs"); |
| walogCollector.collect(status); |
| } catch (Exception e) { |
| log.error("{}", e.getMessage(), e); |
| } finally { |
| waLogs.stop(); |
| } |
| gcSpan.stop(); |
| |
| // we just made a lot of metadata changes: flush them out |
| try { |
| Connector connector = getConnector(); |
| connector.tableOperations().compact(MetadataTable.NAME, null, null, true, true); |
| connector.tableOperations().compact(RootTable.NAME, null, null, true, true); |
| } catch (Exception e) { |
| log.warn("{}", e.getMessage(), e); |
| } |
| |
| Trace.off(); |
| try { |
| long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); |
| log.debug("Sleeping for " + gcDelay + " milliseconds"); |
| Thread.sleep(gcDelay); |
| } catch (InterruptedException e) { |
| log.warn("{}", e.getMessage(), e); |
| return; |
| } |
| } |
| } |
| |
| /** |
| * Moves a file to trash. If this garbage collector is not using trash, this method returns false and leaves the file alone. If the file is missing, this |
| * method returns false as opposed to throwing an exception. |
| * |
| * @return true if the file was moved to trash |
| * @throws IOException |
| * if the volume manager encountered a problem |
| */ |
| boolean archiveOrMoveToTrash(Path path) throws IOException { |
| if (shouldArchiveFiles()) { |
| return archiveFile(path); |
| } else { |
| if (!isUsingTrash()) |
| return false; |
| try { |
| return fs.moveToTrash(path); |
| } catch (FileNotFoundException ex) { |
| return false; |
| } |
| } |
| } |
| |
| /** |
| * Move a file, that would otherwise be deleted, to the archive directory for files |
| * |
| * @param fileToArchive |
| * Path to file that is to be archived |
| * @return True if the file was successfully moved to the file archive directory, false otherwise |
| */ |
| boolean archiveFile(Path fileToArchive) throws IOException { |
| // Figure out what the base path this volume uses on this FileSystem |
| Volume sourceVolume = fs.getVolumeByPath(fileToArchive); |
| String sourceVolumeBasePath = sourceVolume.getBasePath(); |
| |
| log.debug("Base path for volume: " + sourceVolumeBasePath); |
| |
| // Get the path for the file we want to archive |
| String sourcePathBasePath = fileToArchive.toUri().getPath(); |
| |
| // Strip off the common base path for the file to archive |
| String relativeVolumePath = sourcePathBasePath.substring(sourceVolumeBasePath.length()); |
| if (Path.SEPARATOR_CHAR == relativeVolumePath.charAt(0)) { |
| if (relativeVolumePath.length() > 1) { |
| relativeVolumePath = relativeVolumePath.substring(1); |
| } else { |
| relativeVolumePath = ""; |
| } |
| } |
| |
| log.debug("Computed relative path for file to archive: " + relativeVolumePath); |
| |
| // The file archive path on this volume (we can't archive this file to a different volume) |
| Path archivePath = new Path(sourceVolumeBasePath, ServerConstants.FILE_ARCHIVE_DIR); |
| |
| log.debug("File archive path: " + archivePath); |
| |
| fs.mkdirs(archivePath); |
| |
| // Preserve the path beneath the Volume's base directory (e.g. tables/1/A_0000001.rf) |
| Path fileArchivePath = new Path(archivePath, relativeVolumePath); |
| |
| log.debug("Create full path of " + fileArchivePath + " from " + archivePath + " and " + relativeVolumePath); |
| |
| // Make sure that it doesn't already exist, something is wrong. |
| if (fs.exists(fileArchivePath)) { |
| log.warn("Tried to archive file, but it already exists: " + fileArchivePath); |
| return false; |
| } |
| |
| log.debug("Moving " + fileToArchive + " to " + fileArchivePath); |
| return fs.rename(fileToArchive, fileArchivePath); |
| } |
| |
| private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException { |
| String path = ZooUtil.getRoot(getInstance()) + Constants.ZGC_LOCK; |
| |
| LockWatcher lockWatcher = new LockWatcher() { |
| @Override |
| public void lostLock(LockLossReason reason) { |
| Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1); |
| } |
| |
| @Override |
| public void unableToMonitorLockNode(final Throwable e) { |
| // ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility |
| Halt.halt(-1, new Runnable() { |
| |
| @Override |
| public void run() { |
| log.error("FATAL: No longer able to monitor lock node ", e); |
| } |
| }); |
| |
| } |
| }; |
| |
| while (true) { |
| lock = new ZooLock(path); |
| if (lock.tryLock(lockWatcher, new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes())) { |
| log.debug("Got GC ZooKeeper lock"); |
| return; |
| } |
| log.debug("Failed to get GC ZooKeeper lock, will retry"); |
| sleepUninterruptibly(1, TimeUnit.SECONDS); |
| } |
| } |
| |
| private HostAndPort startStatsService() throws UnknownHostException { |
| Iface rpcProxy = RpcWrapper.service(this, new Processor<Iface>(this)); |
| final Processor<Iface> processor; |
| if (ThriftServerType.SASL == getThriftServerType()) { |
| Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration()); |
| processor = new Processor<>(tcProxy); |
| } else { |
| processor = new Processor<>(rpcProxy); |
| } |
| int port[] = getConfiguration().getPort(Property.GC_PORT); |
| HostAndPort[] addresses = TServerUtils.getHostAndPorts(this.opts.getAddress(), port); |
| long maxMessageSize = getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE); |
| try { |
| ServerAddress server = TServerUtils.startTServer(getConfiguration(), getThriftServerType(), processor, this.getClass().getSimpleName(), |
| "GC Monitor Service", 2, getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, maxMessageSize, getServerSslParams(), |
| getSaslParams(), 0, addresses); |
| log.debug("Starting garbage collector listening on " + server.address); |
| return server.address; |
| } catch (Exception ex) { |
| // ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility |
| log.error("FATAL:", ex); |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| /** |
| * Checks if the system is almost out of memory. |
| * |
| * @param runtime |
| * Java runtime |
| * @return true if system is almost out of memory |
| * @see #CANDIDATE_MEMORY_PERCENTAGE |
| */ |
| static boolean almostOutOfMemory(Runtime runtime) { |
| return runtime.totalMemory() - runtime.freeMemory() > CANDIDATE_MEMORY_PERCENTAGE * runtime.maxMemory(); |
| } |
| |
| private static void putMarkerDeleteMutation(final String delete, final BatchWriter writer) throws MutationsRejectedException { |
| Mutation m = new Mutation(MetadataSchema.DeletesSection.getRowPrefix() + delete); |
| m.putDelete(EMPTY_TEXT, EMPTY_TEXT); |
| writer.addMutation(m); |
| } |
| |
| /** |
| * Checks if the given string is a directory. |
| * |
| * @param delete |
| * possible directory |
| * @return true if string is a directory |
| */ |
| static boolean isDir(String delete) { |
| if (delete == null) { |
| return false; |
| } |
| int slashCount = 0; |
| for (int i = 0; i < delete.length(); i++) |
| if (delete.charAt(i) == '/') |
| slashCount++; |
| return slashCount == 1; |
| } |
| |
| @Override |
| public GCStatus getStatus(TInfo info, TCredentials credentials) { |
| return status; |
| } |
| } |