/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.accumulo.gc;

import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface;
import org.apache.accumulo.core.gc.thrift.GCMonitorService.Processor;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.TabletFileUtil;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
import org.apache.accumulo.gc.metrics.GcCycleMetrics;
import org.apache.accumulo.gc.metrics.GcMetricsFactory;
import org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.accumulo.server.fs.VolumeUtil;
import org.apache.accumulo.server.gc.GcVolumeUtil;
import org.apache.accumulo.server.manager.LiveTServerSet;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.htrace.impl.ProbabilitySampler;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.protobuf.InvalidProtocolBufferException;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

// Could/Should implement HighlyAvaialbleService but the Thrift server is already started before
// the ZK lock is acquired. The server is only for metrics, there are no concerns about clients
// using the service before the lock is acquired.
public class SimpleGarbageCollector extends AbstractServer implements Iface {

  private static final Logger log = LoggerFactory.getLogger(SimpleGarbageCollector.class);

  private ZooLock lock;

  private GCStatus status =
      new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());

  private GcCycleMetrics gcCycleMetrics = new GcCycleMetrics();

  public static void main(String[] args) throws Exception {
    try (SimpleGarbageCollector gc = new SimpleGarbageCollector(new ServerOpts(), args)) {
      gc.runServer();
    }
  }

  SimpleGarbageCollector(ServerOpts opts, String[] args) {
    super("gc", opts, args);

    final AccumuloConfiguration conf = getConfiguration();

    boolean gcMetricsRegistered = new GcMetricsFactory(conf).register(this);

    if (gcMetricsRegistered) {
      log.info("gc metrics modules registered with metrics system");
    } else {
      log.info("Failed to register gc metrics module");
    }

    final long gcDelay = conf.getTimeInMillis(Property.GC_CYCLE_DELAY);
    final String useFullCompaction = conf.get(Property.GC_USE_FULL_COMPACTION);

    log.info("start delay: {} milliseconds", getStartDelay());
    log.info("time delay: {} milliseconds", gcDelay);
    log.info("safemode: {}", inSafeMode());
    log.info("candidate batch size: {} bytes", getCandidateBatchSize());
    log.info("delete threads: {}", getNumDeleteThreads());
    log.info("gc post metadata action: {}", useFullCompaction);
  }

  /**
   * Gets the delay before the first collection.
   *
   * @return start delay, in milliseconds
   */
  long getStartDelay() {
    return getConfiguration().getTimeInMillis(Property.GC_CYCLE_START);
  }

  /**
   * Checks if the volume manager should move files to the trash rather than delete them.
   *
   * @return true if trash is used
   */
  boolean isUsingTrash() {
    return !getConfiguration().getBoolean(Property.GC_TRASH_IGNORE);
  }

  /**
   * Gets the number of threads used for deleting files.
   *
   * @return number of delete threads
   */
  int getNumDeleteThreads() {
    return getConfiguration().getCount(Property.GC_DELETE_THREADS);
  }

  /**
   * Gets the batch size for garbage collecting.
   *
   * @return candidate batch size.
   */
  long getCandidateBatchSize() {
    return getConfiguration().getAsBytes(Property.GC_CANDIDATE_BATCH_SIZE);
  }

  /**
   * Checks if safemode is set - files will not be deleted.
   *
   * @return number of delete threads
   */
  boolean inSafeMode() {
    return getConfiguration().getBoolean(Property.GC_SAFEMODE);
  }

  private class GCEnv implements GarbageCollectionEnvironment {

    private DataLevel level;

    GCEnv(Ample.DataLevel level) {
      this.level = level;
    }

    @Override
    public boolean getCandidates(String continuePoint, List<String> result)
        throws TableNotFoundException {

      Iterator<String> candidates = getContext().getAmple().getGcCandidates(level, continuePoint);
      long candidateLength = 0;
      // Converting the bytes to approximate number of characters for batch size.
      long candidateBatchSize = getCandidateBatchSize() / 2;

      result.clear();

      while (candidates.hasNext()) {
        String candidate = candidates.next();
        candidateLength += candidate.length();
        result.add(candidate);
        if (candidateLength > candidateBatchSize) {
          log.info(
              "Candidate batch of size {} has exceeded the"
                  + " threshold. Attempting to delete what has been gathered so far.",
              candidateLength);
          return true;
        }
      }
      return false;
    }

    @Override
    public Stream<String> getBlipPaths() throws TableNotFoundException {

      if (level == DataLevel.ROOT) {
        return Stream.empty();
      }

      int blipPrefixLen = BlipSection.getRowPrefix().length();
      var scanner =
          new IsolatedScanner(getContext().createScanner(level.metaTable(), Authorizations.EMPTY));
      scanner.setRange(BlipSection.getRange());
      return StreamSupport.stream(scanner.spliterator(), false)
          .map(entry -> entry.getKey().getRow().toString().substring(blipPrefixLen))
          .onClose(scanner::close);
    }

    @Override
    public Stream<Reference> getReferences() {

      Stream<TabletMetadata> tabletStream;

      if (level == DataLevel.ROOT) {
        tabletStream =
            Stream.of(getContext().getAmple().readTablet(RootTable.EXTENT, DIR, FILES, SCANS));
      } else {
        tabletStream = TabletsMetadata.builder().scanTable(level.metaTable()).checkConsistency()
            .fetch(DIR, FILES, SCANS).build(getContext()).stream();
      }

      Stream<Reference> refStream = tabletStream.flatMap(tm -> {
        Stream<Reference> refs = Stream.concat(tm.getFiles().stream(), tm.getScans().stream())
            .map(f -> new Reference(tm.getTableId(), f.getMetaUpdateDelete(), false));
        if (tm.getDirName() != null) {
          refs =
              Stream.concat(refs, Stream.of(new Reference(tm.getTableId(), tm.getDirName(), true)));
        }
        return refs;
      });

      return refStream;
    }

    @Override
    public Set<TableId> getTableIDs() {
      return Tables.getIdToNameMap(getContext()).keySet();
    }

    @Override
    public void delete(SortedMap<String,String> confirmedDeletes) throws TableNotFoundException {
      final VolumeManager fs = getContext().getVolumeManager();
      var metadataLocation = level == DataLevel.ROOT
          ? getContext().getZooKeeperRoot() + " for " + RootTable.NAME : level.metaTable();

      if (inSafeMode()) {
        System.out.println("SAFEMODE: There are " + confirmedDeletes.size()
            + " data file candidates marked for deletion in " + metadataLocation + ".\n"
            + "          Examine the log files to identify them.\n");
        log.info("SAFEMODE: Listing all data file candidates for deletion");
        for (String s : confirmedDeletes.values()) {
          log.info("SAFEMODE: {}", s);
        }
        log.info("SAFEMODE: End candidates for deletion");
        return;
      }

      List<String> processedDeletes = Collections.synchronizedList(new ArrayList<>());

      minimizeDeletes(confirmedDeletes, processedDeletes, fs);

      ExecutorService deleteThreadPool =
          ThreadPools.createExecutorService(getConfiguration(), Property.GC_DELETE_THREADS);

      final List<Pair<Path,Path>> replacements =
          ServerConstants.getVolumeReplacements(getConfiguration(), getContext().getHadoopConf());

      for (final String delete : confirmedDeletes.values()) {

        Runnable deleteTask = () -> {
          boolean removeFlag = false;

          try {
            Path fullPath;
            Path switchedDelete = VolumeUtil.switchVolume(delete, FileType.TABLE, replacements);
            if (switchedDelete != null) {
              // actually replacing the volumes in the metadata table would be tricky because the
              // entries would be different rows. So it could not be
              // atomically in one mutation and extreme care would need to be taken that delete
              // entry was not lost. Instead of doing that, just deal with
              // volume switching when something needs to be deleted. Since the rest of the code
              // uses suffixes to compare delete entries, there is no danger
              // of deleting something that should not be deleted. Must not change value of delete
              // variable because thats whats stored in metadata table.
              log.debug("Volume replaced {} -> {}", delete, switchedDelete);
              fullPath = TabletFileUtil.validate(switchedDelete);
            } else {
              fullPath = new Path(TabletFileUtil.validate(delete));
            }

            for (Path pathToDel : GcVolumeUtil.expandAllVolumesUri(fs, fullPath)) {
              log.debug("Deleting {}", pathToDel);

              if (moveToTrash(pathToDel) || fs.deleteRecursively(pathToDel)) {
                // delete succeeded, still want to delete
                removeFlag = true;
                synchronized (SimpleGarbageCollector.this) {
                  ++status.current.deleted;
                }
              } else if (fs.exists(pathToDel)) {
                // leave the entry in the metadata; we'll try again later
                removeFlag = false;
                synchronized (SimpleGarbageCollector.this) {
                  ++status.current.errors;
                }
                log.warn("File exists, but was not deleted for an unknown reason: {}", pathToDel);
                break;
              } else {
                // this failure, we still want to remove the metadata entry
                removeFlag = true;
                synchronized (SimpleGarbageCollector.this) {
                  ++status.current.errors;
                }
                String[] parts = pathToDel.toString().split(Constants.ZTABLES)[1].split("/");
                if (parts.length > 2) {
                  TableId tableId = TableId.of(parts[1]);
                  String tabletDir = parts[2];
                  getContext().getTableManager().updateTableStateCache(tableId);
                  TableState tableState = getContext().getTableManager().getTableState(tableId);
                  if (tableState != null && tableState != TableState.DELETING) {
                    // clone directories don't always exist
                    if (!tabletDir.startsWith(Constants.CLONE_PREFIX)) {
                      log.debug("File doesn't exist: {}", pathToDel);
                    }
                  }
                } else {
                  log.warn("Very strange path name: {}", delete);
                }
              }
            }

            // proceed to clearing out the flags for successful deletes and
            // non-existent files
            if (removeFlag) {
              processedDeletes.add(delete);
            }
          } catch (Exception e) {
            log.error("{}", e.getMessage(), e);
          }

        };

        deleteThreadPool.execute(deleteTask);
      }

      deleteThreadPool.shutdown();

      try {
        while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) {}
      } catch (InterruptedException e1) {
        log.error("{}", e1.getMessage(), e1);
      }

      getContext().getAmple().deleteGcCandidates(level, processedDeletes);
    }

    @Override
    public void deleteTableDirIfEmpty(TableId tableID) throws IOException {
      final VolumeManager fs = getContext().getVolumeManager();
      // if dir exist and is empty, then empty list is returned...
      // hadoop 2.0 will throw an exception if the file does not exist
      for (String dir : ServerConstants.getTablesDirs(getContext())) {
        FileStatus[] tabletDirs = null;
        try {
          tabletDirs = fs.listStatus(new Path(dir + "/" + tableID));
        } catch (FileNotFoundException ex) {
          continue;
        }

        if (tabletDirs.length == 0) {
          Path p = new Path(dir + "/" + tableID);
          log.debug("Removing table dir {}", p);
          if (!moveToTrash(p)) {
            fs.delete(p);
          }
        }
      }
    }

    @Override
    public void incrementCandidatesStat(long i) {
      status.current.candidates += i;
    }

    @Override
    public void incrementInUseStat(long i) {
      status.current.inUse += i;
    }

    @Override
    public Iterator<Entry<String,Status>> getReplicationNeededIterator() {
      AccumuloClient client = getContext();
      try {
        Scanner s = ReplicationTable.getScanner(client);
        StatusSection.limit(s);
        return Iterators.transform(s.iterator(), input -> {
          String file = input.getKey().getRow().toString();
          Status stat;
          try {
            stat = Status.parseFrom(input.getValue().get());
          } catch (InvalidProtocolBufferException e) {
            log.warn("Could not deserialize protobuf for: {}", input.getKey());
            stat = null;
          }
          return Maps.immutableEntry(file, stat);
        });
      } catch (ReplicationTableOfflineException e) {
        // No elements that we need to preclude
        return Collections.emptyIterator();
      }
    }
  }

  @Override
  @SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call System.exit")
  public void run() {
    final VolumeManager fs = getContext().getVolumeManager();

    // Sleep for an initial period, giving the manager time to start up and
    // old data files to be unused
    log.info("Trying to acquire ZooKeeper lock for garbage collector");

    try {
      getZooLock(startStatsService());
    } catch (Exception ex) {
      log.error("{}", ex.getMessage(), ex);
      System.exit(1);
    }

    try {
      long delay = getStartDelay();
      log.debug("Sleeping for {} milliseconds before beginning garbage collection cycles", delay);
      Thread.sleep(delay);
    } catch (InterruptedException e) {
      log.warn("{}", e.getMessage(), e);
      return;
    }

    ProbabilitySampler sampler =
        TraceUtil.probabilitySampler(getConfiguration().getFraction(Property.GC_TRACE_PERCENT));

    // This is created outside of the run loop and passed to the walogCollector so that
    // only a single timed task is created (internal to LiveTServerSet using SimpleTimer.
    final LiveTServerSet liveTServerSet =
        new LiveTServerSet(getContext(), (current, deleted, added) -> {
          log.debug("Number of current servers {}, tservers added {}, removed {}",
              current == null ? -1 : current.size(), added, deleted);

          if (log.isTraceEnabled()) {
            log.trace("Current servers: {}\nAdded: {}\n Removed: {}", current, added, deleted);
          }
        });

    while (true) {
      try (TraceScope gcOuterSpan = Trace.startSpan("gc", sampler)) {
        try (TraceScope gcSpan = Trace.startSpan("loop")) {
          final long tStart = System.nanoTime();
          try {
            System.gc(); // make room

            status.current.started = System.currentTimeMillis();

            new GarbageCollectionAlgorithm().collect(new GCEnv(DataLevel.ROOT));
            new GarbageCollectionAlgorithm().collect(new GCEnv(DataLevel.METADATA));
            new GarbageCollectionAlgorithm().collect(new GCEnv(DataLevel.USER));

            log.info("Number of data file candidates for deletion: {}", status.current.candidates);
            log.info("Number of data file candidates still in use: {}", status.current.inUse);
            log.info("Number of successfully deleted data files: {}", status.current.deleted);
            log.info("Number of data files delete failures: {}", status.current.errors);

            status.current.finished = System.currentTimeMillis();
            status.last = status.current;
            gcCycleMetrics.setLastCollect(status.current);
            status.current = new GcCycleStats();

          } catch (Exception e) {
            log.error("{}", e.getMessage(), e);
          }

          final long tStop = System.nanoTime();
          log.info(String.format("Collect cycle took %.2f seconds",
              (TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0)));

          /*
           * We want to prune references to fully-replicated WALs from the replication table which
           * are no longer referenced in the metadata table before running
           * GarbageCollectWriteAheadLogs to ensure we delete as many files as possible.
           */
          try (TraceScope replSpan = Trace.startSpan("replicationClose")) {
            CloseWriteAheadLogReferences closeWals = new CloseWriteAheadLogReferences(getContext());
            closeWals.run();
          } catch (Exception e) {
            log.error("Error trying to close write-ahead logs for replication table", e);
          }

          // Clean up any unused write-ahead logs
          try (TraceScope waLogs = Trace.startSpan("walogs")) {
            GarbageCollectWriteAheadLogs walogCollector =
                new GarbageCollectWriteAheadLogs(getContext(), fs, liveTServerSet, isUsingTrash());
            log.info("Beginning garbage collection of write-ahead logs");
            walogCollector.collect(status);
            gcCycleMetrics.setLastWalCollect(status.lastLog);
          } catch (Exception e) {
            log.error("{}", e.getMessage(), e);
          }
        }

        // we just made a lot of metadata changes: flush them out
        try {
          AccumuloClient accumuloClient = getContext();

          final long actionStart = System.nanoTime();

          String action = getConfiguration().get(Property.GC_USE_FULL_COMPACTION);
          log.debug("gc post action {} started", action);

          switch (action) {
            case "compact":
              accumuloClient.tableOperations().compact(MetadataTable.NAME, null, null, true, true);
              accumuloClient.tableOperations().compact(RootTable.NAME, null, null, true, true);
              break;
            case "flush":
              accumuloClient.tableOperations().flush(MetadataTable.NAME, null, null, true);
              accumuloClient.tableOperations().flush(RootTable.NAME, null, null, true);
              break;
            default:
              log.trace("\'none - no action\' or invalid value provided: {}", action);
          }

          final long actionComplete = System.nanoTime();

          gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart);

          log.info("gc post action {} completed in {} seconds", action, String.format("%.2f",
              (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 1000.0)));

        } catch (Exception e) {
          log.warn("{}", e.getMessage(), e);
        }
      }
      try {
        gcCycleMetrics.incrementRunCycleCount();
        long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
        log.debug("Sleeping for {} milliseconds", gcDelay);
        Thread.sleep(gcDelay);
      } catch (InterruptedException e) {
        log.warn("{}", e.getMessage(), e);
        return;
      }
    }
  }

  /**
   * Moves a file to trash. If this garbage collector is not using trash, this method returns false
   * and leaves the file alone. If the file is missing, this method returns false as opposed to
   * throwing an exception.
   *
   * @return true if the file was moved to trash
   * @throws IOException
   *           if the volume manager encountered a problem
   */
  boolean moveToTrash(Path path) throws IOException {
    final VolumeManager fs = getContext().getVolumeManager();
    if (!isUsingTrash()) {
      return false;
    }
    try {
      return fs.moveToTrash(path);
    } catch (FileNotFoundException ex) {
      return false;
    }
  }

  private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException {
    String path = getContext().getZooKeeperRoot() + Constants.ZGC_LOCK;

    LockWatcher lockWatcher = new LockWatcher() {
      @Override
      public void lostLock(LockLossReason reason) {
        Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1);
      }

      @Override
      public void unableToMonitorLockNode(final Exception e) {
        // ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility
        Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock node ", e));

      }
    };

    UUID zooLockUUID = UUID.randomUUID();
    while (true) {
      lock = new ZooLock(getContext().getSiteConfiguration(), path, zooLockUUID);
      if (lock.tryLock(lockWatcher,
          new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes())) {
        log.debug("Got GC ZooKeeper lock");
        return;
      }
      log.debug("Failed to get GC ZooKeeper lock, will retry");
      sleepUninterruptibly(1, TimeUnit.SECONDS);
    }
  }

  private HostAndPort startStatsService() {
    Iface rpcProxy = TraceUtil.wrapService(this);
    final Processor<Iface> processor;
    if (getContext().getThriftServerType() == ThriftServerType.SASL) {
      Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration());
      processor = new Processor<>(tcProxy);
    } else {
      processor = new Processor<>(rpcProxy);
    }
    IntStream port = getConfiguration().getPortStream(Property.GC_PORT);
    HostAndPort[] addresses = TServerUtils.getHostAndPorts(getHostname(), port);
    long maxMessageSize = getConfiguration().getAsBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
    try {
      ServerAddress server = TServerUtils.startTServer(getMetricsSystem(), getConfiguration(),
          getContext().getThriftServerType(), processor, this.getClass().getSimpleName(),
          "GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize,
          getContext().getServerSslParams(), getContext().getSaslParams(), 0, addresses);
      log.debug("Starting garbage collector listening on " + server.address);
      return server.address;
    } catch (Exception ex) {
      // ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility
      log.error("FATAL:", ex);
      throw new RuntimeException(ex);
    }
  }

  /**
   * Checks if the given string is a directory.
   *
   * @param delete
   *          possible directory
   * @return true if string is a directory
   */
  static boolean isDir(String delete) {
    if (delete == null) {
      return false;
    }

    int slashCount = 0;
    for (int i = 0; i < delete.length(); i++) {
      if (delete.charAt(i) == '/') {
        slashCount++;
      }
    }
    return slashCount == 1;
  }

  @VisibleForTesting
  static void minimizeDeletes(SortedMap<String,String> confirmedDeletes,
      List<String> processedDeletes, VolumeManager fs) {
    Set<Path> seenVolumes = new HashSet<>();

    // when deleting a dir and all files in that dir, only need to delete the dir
    // the dir will sort right before the files... so remove the files in this case
    // to minimize namenode ops
    Iterator<Entry<String,String>> cdIter = confirmedDeletes.entrySet().iterator();

    String lastDirRel = null;
    Path lastDirAbs = null;
    while (cdIter.hasNext()) {
      Entry<String,String> entry = cdIter.next();
      String relPath = entry.getKey();
      Path absPath = new Path(entry.getValue());

      if (isDir(relPath)) {
        lastDirRel = relPath;
        lastDirAbs = absPath;
      } else if (lastDirRel != null) {
        if (relPath.startsWith(lastDirRel)) {
          Path vol = FileType.TABLE.getVolume(absPath);

          boolean sameVol = false;

          if (GcVolumeUtil.isAllVolumesUri(lastDirAbs)) {
            if (seenVolumes.contains(vol)) {
              sameVol = true;
            } else {
              for (Volume cvol : fs.getVolumes()) {
                if (cvol.containsPath(vol)) {
                  seenVolumes.add(vol);
                  sameVol = true;
                }
              }
            }
          } else {
            sameVol = FileType.TABLE.getVolume(lastDirAbs).equals(vol);
          }

          if (sameVol) {
            log.info("Ignoring {} because {} exist", entry.getValue(), lastDirAbs);
            processedDeletes.add(entry.getValue());
            cdIter.remove();
          }
        } else {
          lastDirRel = null;
          lastDirAbs = null;
        }
      }
    }
  }

  @Override
  public GCStatus getStatus(TInfo info, TCredentials credentials) {
    return status;
  }

  public GcCycleMetrics getGcCycleMetrics() {
    return gcCycleMetrics;
  }
}
