| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.gc; |
| |
| import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR; |
| import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; |
| import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS; |
| import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.UUID; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.IntStream; |
| import java.util.stream.Stream; |
| import java.util.stream.StreamSupport; |
| |
| import org.apache.accumulo.core.Constants; |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.IsolatedScanner; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.clientImpl.Tables; |
| import org.apache.accumulo.core.conf.AccumuloConfiguration; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.data.TableId; |
| import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface; |
| import org.apache.accumulo.core.gc.thrift.GCMonitorService.Processor; |
| import org.apache.accumulo.core.gc.thrift.GCStatus; |
| import org.apache.accumulo.core.gc.thrift.GcCycleStats; |
| import org.apache.accumulo.core.manager.state.tables.TableState; |
| import org.apache.accumulo.core.metadata.MetadataTable; |
| import org.apache.accumulo.core.metadata.RootTable; |
| import org.apache.accumulo.core.metadata.TabletFileUtil; |
| import org.apache.accumulo.core.metadata.schema.Ample; |
| import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection; |
| import org.apache.accumulo.core.metadata.schema.TabletMetadata; |
| import org.apache.accumulo.core.metadata.schema.TabletsMetadata; |
| import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; |
| import org.apache.accumulo.core.replication.ReplicationTable; |
| import org.apache.accumulo.core.replication.ReplicationTableOfflineException; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.core.securityImpl.thrift.TCredentials; |
| import org.apache.accumulo.core.trace.TraceUtil; |
| import org.apache.accumulo.core.trace.thrift.TInfo; |
| import org.apache.accumulo.core.util.Halt; |
| import org.apache.accumulo.core.util.HostAndPort; |
| import org.apache.accumulo.core.util.Pair; |
| import org.apache.accumulo.core.util.ServerServices; |
| import org.apache.accumulo.core.util.ServerServices.Service; |
| import org.apache.accumulo.core.util.threads.ThreadPools; |
| import org.apache.accumulo.core.volume.Volume; |
| import org.apache.accumulo.fate.zookeeper.ZooLock; |
| import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; |
| import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; |
| import org.apache.accumulo.gc.metrics.GcCycleMetrics; |
| import org.apache.accumulo.gc.metrics.GcMetricsFactory; |
| import org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences; |
| import org.apache.accumulo.server.AbstractServer; |
| import org.apache.accumulo.server.ServerConstants; |
| import org.apache.accumulo.server.ServerOpts; |
| import org.apache.accumulo.server.fs.VolumeManager; |
| import org.apache.accumulo.server.fs.VolumeManager.FileType; |
| import org.apache.accumulo.server.fs.VolumeUtil; |
| import org.apache.accumulo.server.gc.GcVolumeUtil; |
| import org.apache.accumulo.server.manager.LiveTServerSet; |
| import org.apache.accumulo.server.replication.proto.Replication.Status; |
| import org.apache.accumulo.server.rpc.ServerAddress; |
| import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; |
| import org.apache.accumulo.server.rpc.TServerUtils; |
| import org.apache.accumulo.server.rpc.ThriftServerType; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.htrace.Trace; |
| import org.apache.htrace.TraceScope; |
| import org.apache.htrace.impl.ProbabilitySampler; |
| import org.apache.zookeeper.KeeperException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Maps; |
| import com.google.protobuf.InvalidProtocolBufferException; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| |
| // Could/Should implement HighlyAvaialbleService but the Thrift server is already started before |
| // the ZK lock is acquired. The server is only for metrics, there are no concerns about clients |
| // using the service before the lock is acquired. |
| public class SimpleGarbageCollector extends AbstractServer implements Iface { |
| |
| private static final Logger log = LoggerFactory.getLogger(SimpleGarbageCollector.class); |
| |
| private ZooLock lock; |
| |
| private GCStatus status = |
| new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats()); |
| |
| private GcCycleMetrics gcCycleMetrics = new GcCycleMetrics(); |
| |
| public static void main(String[] args) throws Exception { |
| try (SimpleGarbageCollector gc = new SimpleGarbageCollector(new ServerOpts(), args)) { |
| gc.runServer(); |
| } |
| } |
| |
| SimpleGarbageCollector(ServerOpts opts, String[] args) { |
| super("gc", opts, args); |
| |
| final AccumuloConfiguration conf = getConfiguration(); |
| |
| boolean gcMetricsRegistered = new GcMetricsFactory(conf).register(this); |
| |
| if (gcMetricsRegistered) { |
| log.info("gc metrics modules registered with metrics system"); |
| } else { |
| log.info("Failed to register gc metrics module"); |
| } |
| |
| final long gcDelay = conf.getTimeInMillis(Property.GC_CYCLE_DELAY); |
| final String useFullCompaction = conf.get(Property.GC_USE_FULL_COMPACTION); |
| |
| log.info("start delay: {} milliseconds", getStartDelay()); |
| log.info("time delay: {} milliseconds", gcDelay); |
| log.info("safemode: {}", inSafeMode()); |
| log.info("candidate batch size: {} bytes", getCandidateBatchSize()); |
| log.info("delete threads: {}", getNumDeleteThreads()); |
| log.info("gc post metadata action: {}", useFullCompaction); |
| } |
| |
| /** |
| * Gets the delay before the first collection. |
| * |
| * @return start delay, in milliseconds |
| */ |
| long getStartDelay() { |
| return getConfiguration().getTimeInMillis(Property.GC_CYCLE_START); |
| } |
| |
| /** |
| * Checks if the volume manager should move files to the trash rather than delete them. |
| * |
| * @return true if trash is used |
| */ |
| boolean isUsingTrash() { |
| return !getConfiguration().getBoolean(Property.GC_TRASH_IGNORE); |
| } |
| |
| /** |
| * Gets the number of threads used for deleting files. |
| * |
| * @return number of delete threads |
| */ |
| int getNumDeleteThreads() { |
| return getConfiguration().getCount(Property.GC_DELETE_THREADS); |
| } |
| |
| /** |
| * Gets the batch size for garbage collecting. |
| * |
| * @return candidate batch size. |
| */ |
| long getCandidateBatchSize() { |
| return getConfiguration().getAsBytes(Property.GC_CANDIDATE_BATCH_SIZE); |
| } |
| |
| /** |
| * Checks if safemode is set - files will not be deleted. |
| * |
| * @return number of delete threads |
| */ |
| boolean inSafeMode() { |
| return getConfiguration().getBoolean(Property.GC_SAFEMODE); |
| } |
| |
| private class GCEnv implements GarbageCollectionEnvironment { |
| |
| private DataLevel level; |
| |
| GCEnv(Ample.DataLevel level) { |
| this.level = level; |
| } |
| |
| @Override |
| public boolean getCandidates(String continuePoint, List<String> result) |
| throws TableNotFoundException { |
| |
| Iterator<String> candidates = getContext().getAmple().getGcCandidates(level, continuePoint); |
| long candidateLength = 0; |
| // Converting the bytes to approximate number of characters for batch size. |
| long candidateBatchSize = getCandidateBatchSize() / 2; |
| |
| result.clear(); |
| |
| while (candidates.hasNext()) { |
| String candidate = candidates.next(); |
| candidateLength += candidate.length(); |
| result.add(candidate); |
| if (candidateLength > candidateBatchSize) { |
| log.info( |
| "Candidate batch of size {} has exceeded the" |
| + " threshold. Attempting to delete what has been gathered so far.", |
| candidateLength); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public Stream<String> getBlipPaths() throws TableNotFoundException { |
| |
| if (level == DataLevel.ROOT) { |
| return Stream.empty(); |
| } |
| |
| int blipPrefixLen = BlipSection.getRowPrefix().length(); |
| var scanner = |
| new IsolatedScanner(getContext().createScanner(level.metaTable(), Authorizations.EMPTY)); |
| scanner.setRange(BlipSection.getRange()); |
| return StreamSupport.stream(scanner.spliterator(), false) |
| .map(entry -> entry.getKey().getRow().toString().substring(blipPrefixLen)) |
| .onClose(scanner::close); |
| } |
| |
| @Override |
| public Stream<Reference> getReferences() { |
| |
| Stream<TabletMetadata> tabletStream; |
| |
| if (level == DataLevel.ROOT) { |
| tabletStream = |
| Stream.of(getContext().getAmple().readTablet(RootTable.EXTENT, DIR, FILES, SCANS)); |
| } else { |
| tabletStream = TabletsMetadata.builder().scanTable(level.metaTable()).checkConsistency() |
| .fetch(DIR, FILES, SCANS).build(getContext()).stream(); |
| } |
| |
| Stream<Reference> refStream = tabletStream.flatMap(tm -> { |
| Stream<Reference> refs = Stream.concat(tm.getFiles().stream(), tm.getScans().stream()) |
| .map(f -> new Reference(tm.getTableId(), f.getMetaUpdateDelete(), false)); |
| if (tm.getDirName() != null) { |
| refs = |
| Stream.concat(refs, Stream.of(new Reference(tm.getTableId(), tm.getDirName(), true))); |
| } |
| return refs; |
| }); |
| |
| return refStream; |
| } |
| |
| @Override |
| public Set<TableId> getTableIDs() { |
| return Tables.getIdToNameMap(getContext()).keySet(); |
| } |
| |
| @Override |
| public void delete(SortedMap<String,String> confirmedDeletes) throws TableNotFoundException { |
| final VolumeManager fs = getContext().getVolumeManager(); |
| var metadataLocation = level == DataLevel.ROOT |
| ? getContext().getZooKeeperRoot() + " for " + RootTable.NAME : level.metaTable(); |
| |
| if (inSafeMode()) { |
| System.out.println("SAFEMODE: There are " + confirmedDeletes.size() |
| + " data file candidates marked for deletion in " + metadataLocation + ".\n" |
| + " Examine the log files to identify them.\n"); |
| log.info("SAFEMODE: Listing all data file candidates for deletion"); |
| for (String s : confirmedDeletes.values()) { |
| log.info("SAFEMODE: {}", s); |
| } |
| log.info("SAFEMODE: End candidates for deletion"); |
| return; |
| } |
| |
| List<String> processedDeletes = Collections.synchronizedList(new ArrayList<>()); |
| |
| minimizeDeletes(confirmedDeletes, processedDeletes, fs); |
| |
| ExecutorService deleteThreadPool = |
| ThreadPools.createExecutorService(getConfiguration(), Property.GC_DELETE_THREADS); |
| |
| final List<Pair<Path,Path>> replacements = |
| ServerConstants.getVolumeReplacements(getConfiguration(), getContext().getHadoopConf()); |
| |
| for (final String delete : confirmedDeletes.values()) { |
| |
| Runnable deleteTask = () -> { |
| boolean removeFlag = false; |
| |
| try { |
| Path fullPath; |
| Path switchedDelete = VolumeUtil.switchVolume(delete, FileType.TABLE, replacements); |
| if (switchedDelete != null) { |
| // actually replacing the volumes in the metadata table would be tricky because the |
| // entries would be different rows. So it could not be |
| // atomically in one mutation and extreme care would need to be taken that delete |
| // entry was not lost. Instead of doing that, just deal with |
| // volume switching when something needs to be deleted. Since the rest of the code |
| // uses suffixes to compare delete entries, there is no danger |
| // of deleting something that should not be deleted. Must not change value of delete |
| // variable because thats whats stored in metadata table. |
| log.debug("Volume replaced {} -> {}", delete, switchedDelete); |
| fullPath = TabletFileUtil.validate(switchedDelete); |
| } else { |
| fullPath = new Path(TabletFileUtil.validate(delete)); |
| } |
| |
| for (Path pathToDel : GcVolumeUtil.expandAllVolumesUri(fs, fullPath)) { |
| log.debug("Deleting {}", pathToDel); |
| |
| if (moveToTrash(pathToDel) || fs.deleteRecursively(pathToDel)) { |
| // delete succeeded, still want to delete |
| removeFlag = true; |
| synchronized (SimpleGarbageCollector.this) { |
| ++status.current.deleted; |
| } |
| } else if (fs.exists(pathToDel)) { |
| // leave the entry in the metadata; we'll try again later |
| removeFlag = false; |
| synchronized (SimpleGarbageCollector.this) { |
| ++status.current.errors; |
| } |
| log.warn("File exists, but was not deleted for an unknown reason: {}", pathToDel); |
| break; |
| } else { |
| // this failure, we still want to remove the metadata entry |
| removeFlag = true; |
| synchronized (SimpleGarbageCollector.this) { |
| ++status.current.errors; |
| } |
| String[] parts = pathToDel.toString().split(Constants.ZTABLES)[1].split("/"); |
| if (parts.length > 2) { |
| TableId tableId = TableId.of(parts[1]); |
| String tabletDir = parts[2]; |
| getContext().getTableManager().updateTableStateCache(tableId); |
| TableState tableState = getContext().getTableManager().getTableState(tableId); |
| if (tableState != null && tableState != TableState.DELETING) { |
| // clone directories don't always exist |
| if (!tabletDir.startsWith(Constants.CLONE_PREFIX)) { |
| log.debug("File doesn't exist: {}", pathToDel); |
| } |
| } |
| } else { |
| log.warn("Very strange path name: {}", delete); |
| } |
| } |
| } |
| |
| // proceed to clearing out the flags for successful deletes and |
| // non-existent files |
| if (removeFlag) { |
| processedDeletes.add(delete); |
| } |
| } catch (Exception e) { |
| log.error("{}", e.getMessage(), e); |
| } |
| |
| }; |
| |
| deleteThreadPool.execute(deleteTask); |
| } |
| |
| deleteThreadPool.shutdown(); |
| |
| try { |
| while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) {} |
| } catch (InterruptedException e1) { |
| log.error("{}", e1.getMessage(), e1); |
| } |
| |
| getContext().getAmple().deleteGcCandidates(level, processedDeletes); |
| } |
| |
| @Override |
| public void deleteTableDirIfEmpty(TableId tableID) throws IOException { |
| final VolumeManager fs = getContext().getVolumeManager(); |
| // if dir exist and is empty, then empty list is returned... |
| // hadoop 2.0 will throw an exception if the file does not exist |
| for (String dir : ServerConstants.getTablesDirs(getContext())) { |
| FileStatus[] tabletDirs = null; |
| try { |
| tabletDirs = fs.listStatus(new Path(dir + "/" + tableID)); |
| } catch (FileNotFoundException ex) { |
| continue; |
| } |
| |
| if (tabletDirs.length == 0) { |
| Path p = new Path(dir + "/" + tableID); |
| log.debug("Removing table dir {}", p); |
| if (!moveToTrash(p)) { |
| fs.delete(p); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void incrementCandidatesStat(long i) { |
| status.current.candidates += i; |
| } |
| |
| @Override |
| public void incrementInUseStat(long i) { |
| status.current.inUse += i; |
| } |
| |
| @Override |
| public Iterator<Entry<String,Status>> getReplicationNeededIterator() { |
| AccumuloClient client = getContext(); |
| try { |
| Scanner s = ReplicationTable.getScanner(client); |
| StatusSection.limit(s); |
| return Iterators.transform(s.iterator(), input -> { |
| String file = input.getKey().getRow().toString(); |
| Status stat; |
| try { |
| stat = Status.parseFrom(input.getValue().get()); |
| } catch (InvalidProtocolBufferException e) { |
| log.warn("Could not deserialize protobuf for: {}", input.getKey()); |
| stat = null; |
| } |
| return Maps.immutableEntry(file, stat); |
| }); |
| } catch (ReplicationTableOfflineException e) { |
| // No elements that we need to preclude |
| return Collections.emptyIterator(); |
| } |
| } |
| } |
| |
| @Override |
| @SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call System.exit") |
| public void run() { |
| final VolumeManager fs = getContext().getVolumeManager(); |
| |
| // Sleep for an initial period, giving the manager time to start up and |
| // old data files to be unused |
| log.info("Trying to acquire ZooKeeper lock for garbage collector"); |
| |
| try { |
| getZooLock(startStatsService()); |
| } catch (Exception ex) { |
| log.error("{}", ex.getMessage(), ex); |
| System.exit(1); |
| } |
| |
| try { |
| long delay = getStartDelay(); |
| log.debug("Sleeping for {} milliseconds before beginning garbage collection cycles", delay); |
| Thread.sleep(delay); |
| } catch (InterruptedException e) { |
| log.warn("{}", e.getMessage(), e); |
| return; |
| } |
| |
| ProbabilitySampler sampler = |
| TraceUtil.probabilitySampler(getConfiguration().getFraction(Property.GC_TRACE_PERCENT)); |
| |
| // This is created outside of the run loop and passed to the walogCollector so that |
| // only a single timed task is created (internal to LiveTServerSet using SimpleTimer. |
| final LiveTServerSet liveTServerSet = |
| new LiveTServerSet(getContext(), (current, deleted, added) -> { |
| log.debug("Number of current servers {}, tservers added {}, removed {}", |
| current == null ? -1 : current.size(), added, deleted); |
| |
| if (log.isTraceEnabled()) { |
| log.trace("Current servers: {}\nAdded: {}\n Removed: {}", current, added, deleted); |
| } |
| }); |
| |
| while (true) { |
| try (TraceScope gcOuterSpan = Trace.startSpan("gc", sampler)) { |
| try (TraceScope gcSpan = Trace.startSpan("loop")) { |
| final long tStart = System.nanoTime(); |
| try { |
| System.gc(); // make room |
| |
| status.current.started = System.currentTimeMillis(); |
| |
| new GarbageCollectionAlgorithm().collect(new GCEnv(DataLevel.ROOT)); |
| new GarbageCollectionAlgorithm().collect(new GCEnv(DataLevel.METADATA)); |
| new GarbageCollectionAlgorithm().collect(new GCEnv(DataLevel.USER)); |
| |
| log.info("Number of data file candidates for deletion: {}", status.current.candidates); |
| log.info("Number of data file candidates still in use: {}", status.current.inUse); |
| log.info("Number of successfully deleted data files: {}", status.current.deleted); |
| log.info("Number of data files delete failures: {}", status.current.errors); |
| |
| status.current.finished = System.currentTimeMillis(); |
| status.last = status.current; |
| gcCycleMetrics.setLastCollect(status.current); |
| status.current = new GcCycleStats(); |
| |
| } catch (Exception e) { |
| log.error("{}", e.getMessage(), e); |
| } |
| |
| final long tStop = System.nanoTime(); |
| log.info(String.format("Collect cycle took %.2f seconds", |
| (TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0))); |
| |
| /* |
| * We want to prune references to fully-replicated WALs from the replication table which |
| * are no longer referenced in the metadata table before running |
| * GarbageCollectWriteAheadLogs to ensure we delete as many files as possible. |
| */ |
| try (TraceScope replSpan = Trace.startSpan("replicationClose")) { |
| CloseWriteAheadLogReferences closeWals = new CloseWriteAheadLogReferences(getContext()); |
| closeWals.run(); |
| } catch (Exception e) { |
| log.error("Error trying to close write-ahead logs for replication table", e); |
| } |
| |
| // Clean up any unused write-ahead logs |
| try (TraceScope waLogs = Trace.startSpan("walogs")) { |
| GarbageCollectWriteAheadLogs walogCollector = |
| new GarbageCollectWriteAheadLogs(getContext(), fs, liveTServerSet, isUsingTrash()); |
| log.info("Beginning garbage collection of write-ahead logs"); |
| walogCollector.collect(status); |
| gcCycleMetrics.setLastWalCollect(status.lastLog); |
| } catch (Exception e) { |
| log.error("{}", e.getMessage(), e); |
| } |
| } |
| |
| // we just made a lot of metadata changes: flush them out |
| try { |
| AccumuloClient accumuloClient = getContext(); |
| |
| final long actionStart = System.nanoTime(); |
| |
| String action = getConfiguration().get(Property.GC_USE_FULL_COMPACTION); |
| log.debug("gc post action {} started", action); |
| |
| switch (action) { |
| case "compact": |
| accumuloClient.tableOperations().compact(MetadataTable.NAME, null, null, true, true); |
| accumuloClient.tableOperations().compact(RootTable.NAME, null, null, true, true); |
| break; |
| case "flush": |
| accumuloClient.tableOperations().flush(MetadataTable.NAME, null, null, true); |
| accumuloClient.tableOperations().flush(RootTable.NAME, null, null, true); |
| break; |
| default: |
| log.trace("\'none - no action\' or invalid value provided: {}", action); |
| } |
| |
| final long actionComplete = System.nanoTime(); |
| |
| gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart); |
| |
| log.info("gc post action {} completed in {} seconds", action, String.format("%.2f", |
| (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 1000.0))); |
| |
| } catch (Exception e) { |
| log.warn("{}", e.getMessage(), e); |
| } |
| } |
| try { |
| gcCycleMetrics.incrementRunCycleCount(); |
| long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); |
| log.debug("Sleeping for {} milliseconds", gcDelay); |
| Thread.sleep(gcDelay); |
| } catch (InterruptedException e) { |
| log.warn("{}", e.getMessage(), e); |
| return; |
| } |
| } |
| } |
| |
| /** |
| * Moves a file to trash. If this garbage collector is not using trash, this method returns false |
| * and leaves the file alone. If the file is missing, this method returns false as opposed to |
| * throwing an exception. |
| * |
| * @return true if the file was moved to trash |
| * @throws IOException |
| * if the volume manager encountered a problem |
| */ |
| boolean moveToTrash(Path path) throws IOException { |
| final VolumeManager fs = getContext().getVolumeManager(); |
| if (!isUsingTrash()) { |
| return false; |
| } |
| try { |
| return fs.moveToTrash(path); |
| } catch (FileNotFoundException ex) { |
| return false; |
| } |
| } |
| |
| private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException { |
| String path = getContext().getZooKeeperRoot() + Constants.ZGC_LOCK; |
| |
| LockWatcher lockWatcher = new LockWatcher() { |
| @Override |
| public void lostLock(LockLossReason reason) { |
| Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1); |
| } |
| |
| @Override |
| public void unableToMonitorLockNode(final Exception e) { |
| // ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility |
| Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock node ", e)); |
| |
| } |
| }; |
| |
| UUID zooLockUUID = UUID.randomUUID(); |
| while (true) { |
| lock = new ZooLock(getContext().getSiteConfiguration(), path, zooLockUUID); |
| if (lock.tryLock(lockWatcher, |
| new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes())) { |
| log.debug("Got GC ZooKeeper lock"); |
| return; |
| } |
| log.debug("Failed to get GC ZooKeeper lock, will retry"); |
| sleepUninterruptibly(1, TimeUnit.SECONDS); |
| } |
| } |
| |
| private HostAndPort startStatsService() { |
| Iface rpcProxy = TraceUtil.wrapService(this); |
| final Processor<Iface> processor; |
| if (getContext().getThriftServerType() == ThriftServerType.SASL) { |
| Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration()); |
| processor = new Processor<>(tcProxy); |
| } else { |
| processor = new Processor<>(rpcProxy); |
| } |
| IntStream port = getConfiguration().getPortStream(Property.GC_PORT); |
| HostAndPort[] addresses = TServerUtils.getHostAndPorts(getHostname(), port); |
| long maxMessageSize = getConfiguration().getAsBytes(Property.GENERAL_MAX_MESSAGE_SIZE); |
| try { |
| ServerAddress server = TServerUtils.startTServer(getMetricsSystem(), getConfiguration(), |
| getContext().getThriftServerType(), processor, this.getClass().getSimpleName(), |
| "GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize, |
| getContext().getServerSslParams(), getContext().getSaslParams(), 0, addresses); |
| log.debug("Starting garbage collector listening on " + server.address); |
| return server.address; |
| } catch (Exception ex) { |
| // ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility |
| log.error("FATAL:", ex); |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| /** |
| * Checks if the given string is a directory. |
| * |
| * @param delete |
| * possible directory |
| * @return true if string is a directory |
| */ |
| static boolean isDir(String delete) { |
| if (delete == null) { |
| return false; |
| } |
| |
| int slashCount = 0; |
| for (int i = 0; i < delete.length(); i++) { |
| if (delete.charAt(i) == '/') { |
| slashCount++; |
| } |
| } |
| return slashCount == 1; |
| } |
| |
| @VisibleForTesting |
| static void minimizeDeletes(SortedMap<String,String> confirmedDeletes, |
| List<String> processedDeletes, VolumeManager fs) { |
| Set<Path> seenVolumes = new HashSet<>(); |
| |
| // when deleting a dir and all files in that dir, only need to delete the dir |
| // the dir will sort right before the files... so remove the files in this case |
| // to minimize namenode ops |
| Iterator<Entry<String,String>> cdIter = confirmedDeletes.entrySet().iterator(); |
| |
| String lastDirRel = null; |
| Path lastDirAbs = null; |
| while (cdIter.hasNext()) { |
| Entry<String,String> entry = cdIter.next(); |
| String relPath = entry.getKey(); |
| Path absPath = new Path(entry.getValue()); |
| |
| if (isDir(relPath)) { |
| lastDirRel = relPath; |
| lastDirAbs = absPath; |
| } else if (lastDirRel != null) { |
| if (relPath.startsWith(lastDirRel)) { |
| Path vol = FileType.TABLE.getVolume(absPath); |
| |
| boolean sameVol = false; |
| |
| if (GcVolumeUtil.isAllVolumesUri(lastDirAbs)) { |
| if (seenVolumes.contains(vol)) { |
| sameVol = true; |
| } else { |
| for (Volume cvol : fs.getVolumes()) { |
| if (cvol.containsPath(vol)) { |
| seenVolumes.add(vol); |
| sameVol = true; |
| } |
| } |
| } |
| } else { |
| sameVol = FileType.TABLE.getVolume(lastDirAbs).equals(vol); |
| } |
| |
| if (sameVol) { |
| log.info("Ignoring {} because {} exist", entry.getValue(), lastDirAbs); |
| processedDeletes.add(entry.getValue()); |
| cdIter.remove(); |
| } |
| } else { |
| lastDirRel = null; |
| lastDirAbs = null; |
| } |
| } |
| } |
| } |
| |
| @Override |
| public GCStatus getStatus(TInfo info, TCredentials credentials) { |
| return status; |
| } |
| |
| public GcCycleMetrics getGcCycleMetrics() { |
| return gcCycleMetrics; |
| } |
| } |