blob: 6a7025a979e0fb53316bfc000e85bd3e8a3fdca6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.gc;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.gc.metrics.GcCycleMetrics;
import org.apache.accumulo.gc.metrics.GcMetrics;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.manager.LiveTServerSet;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
import org.apache.hadoop.fs.Path;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
// Could/Should implement HighlyAvailableService but the Thrift server is already started before
// the ZK lock is acquired. The server is only for metrics, there are no concerns about clients
// using the service before the lock is acquired.
public class SimpleGarbageCollector extends AbstractServer implements Iface {
private static final Logger log = LoggerFactory.getLogger(SimpleGarbageCollector.class);
private final GCStatus status =
new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());
private final GcCycleMetrics gcCycleMetrics = new GcCycleMetrics();
SimpleGarbageCollector(ServerOpts opts, String[] args) {
super("gc", opts, args);
final AccumuloConfiguration conf = getConfiguration();
final long gcDelay = conf.getTimeInMillis(Property.GC_CYCLE_DELAY);
final String useFullCompaction = conf.get(Property.GC_USE_FULL_COMPACTION);
log.info("start delay: {} milliseconds", getStartDelay());
log.info("time delay: {} milliseconds", gcDelay);
log.info("safemode: {}", inSafeMode());
log.info("candidate batch size: {} bytes", getCandidateBatchSize());
log.info("delete threads: {}", getNumDeleteThreads());
log.info("gc post metadata action: {}", useFullCompaction);
}
public static void main(String[] args) throws Exception {
try (SimpleGarbageCollector gc = new SimpleGarbageCollector(new ServerOpts(), args)) {
gc.runServer();
}
}
/**
* Gets the delay before the first collection.
*
* @return start delay, in milliseconds
*/
long getStartDelay() {
return getConfiguration().getTimeInMillis(Property.GC_CYCLE_START);
}
/**
* Checks if the volume manager should move files to the trash rather than delete them.
*
* @return true if trash is used
*/
boolean isUsingTrash() {
@SuppressWarnings("removal")
Property p = Property.GC_TRASH_IGNORE;
return !getConfiguration().getBoolean(p);
}
/**
* Gets the number of threads used for deleting files.
*
* @return number of delete threads
*/
int getNumDeleteThreads() {
return getConfiguration().getCount(Property.GC_DELETE_THREADS);
}
/**
* Gets the batch size for garbage collecting.
*
* @return candidate batch size.
*/
long getCandidateBatchSize() {
return getConfiguration().getAsBytes(Property.GC_CANDIDATE_BATCH_SIZE);
}
/**
* Checks if safemode is set - files will not be deleted.
*
* @return true if safe mode is set, false otherwise
*/
boolean inSafeMode() {
return getConfiguration().getBoolean(Property.GC_SAFEMODE);
}
@Override
@SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call System.exit")
public void run() {
final VolumeManager fs = getContext().getVolumeManager();
// Sleep for an initial period, giving the manager time to start up and
// old data files to be unused
log.info("Trying to acquire ZooKeeper lock for garbage collector");
HostAndPort address = startStatsService();
try {
getZooLock(address);
} catch (Exception ex) {
log.error("{}", ex.getMessage(), ex);
System.exit(1);
}
MetricsInfo metricsInfo = getContext().getMetricsInfo();
metricsInfo.addServiceTags(getApplicationName(), address);
metricsInfo.addMetricsProducers(new GcMetrics(this));
metricsInfo.init();
try {
long delay = getStartDelay();
log.debug("Sleeping for {} milliseconds before beginning garbage collection cycles", delay);
Thread.sleep(delay);
} catch (InterruptedException e) {
log.warn("{}", e.getMessage(), e);
return;
}
// This is created outside of the run loop and passed to the walogCollector so that
// only a single timed task is created (internal to LiveTServerSet) using SimpleTimer.
final LiveTServerSet liveTServerSet =
new LiveTServerSet(getContext(), (current, deleted, added) -> {
log.debug("Number of current servers {}, tservers added {}, removed {}",
current == null ? -1 : current.size(), added, deleted);
if (log.isTraceEnabled()) {
log.trace("Current servers: {}\nAdded: {}\n Removed: {}", current, added, deleted);
}
});
while (true) {
Span outerSpan = TraceUtil.startSpan(this.getClass(), "gc");
try (Scope outerScope = outerSpan.makeCurrent()) {
Span innerSpan = TraceUtil.startSpan(this.getClass(), "loop");
try (Scope innerScope = innerSpan.makeCurrent()) {
final long tStart = System.nanoTime();
try {
System.gc(); // make room
status.current.started = System.currentTimeMillis();
var rootGC = new GCRun(DataLevel.ROOT, getContext());
var mdGC = new GCRun(DataLevel.METADATA, getContext());
var userGC = new GCRun(DataLevel.USER, getContext());
log.info("Starting Root table Garbage Collection.");
status.current.bulks += new GarbageCollectionAlgorithm().collect(rootGC);
incrementStatsForRun(rootGC);
logStats();
log.info("Starting Metadata table Garbage Collection.");
status.current.bulks += new GarbageCollectionAlgorithm().collect(mdGC);
incrementStatsForRun(mdGC);
logStats();
log.info("Starting User table Garbage Collection.");
status.current.bulks += new GarbageCollectionAlgorithm().collect(userGC);
incrementStatsForRun(userGC);
logStats();
} catch (Exception e) {
TraceUtil.setException(innerSpan, e, false);
log.error("{}", e.getMessage(), e);
} finally {
status.current.finished = System.currentTimeMillis();
status.last = status.current;
gcCycleMetrics.setLastCollect(status.current);
status.current = new GcCycleStats();
}
final long tStop = System.nanoTime();
log.info(String.format("Collect cycle took %.2f seconds",
(TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0)));
/*
* We want to prune references to fully-replicated WALs from the replication table which
* are no longer referenced in the metadata table before running
* GarbageCollectWriteAheadLogs to ensure we delete as many files as possible.
*/
Span replSpan = TraceUtil.startSpan(this.getClass(), "replicationClose");
try (Scope replScope = replSpan.makeCurrent()) {
@SuppressWarnings("deprecation")
Runnable closeWals =
new org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences(getContext());
closeWals.run();
} catch (Exception e) {
TraceUtil.setException(replSpan, e, false);
log.error("Error trying to close write-ahead logs for replication table", e);
} finally {
replSpan.end();
}
// Clean up any unused write-ahead logs
Span walSpan = TraceUtil.startSpan(this.getClass(), "walogs");
try (Scope walScope = walSpan.makeCurrent()) {
GarbageCollectWriteAheadLogs walogCollector =
new GarbageCollectWriteAheadLogs(getContext(), fs, liveTServerSet, isUsingTrash());
log.info("Beginning garbage collection of write-ahead logs");
walogCollector.collect(status);
gcCycleMetrics.setLastWalCollect(status.lastLog);
} catch (Exception e) {
TraceUtil.setException(walSpan, e, false);
log.error("{}", e.getMessage(), e);
} finally {
walSpan.end();
}
} catch (Exception e) {
TraceUtil.setException(innerSpan, e, true);
throw e;
} finally {
innerSpan.end();
}
// we just made a lot of metadata changes: flush them out
try {
AccumuloClient accumuloClient = getContext();
final long actionStart = System.nanoTime();
String action = getConfiguration().get(Property.GC_USE_FULL_COMPACTION);
log.debug("gc post action {} started", action);
switch (action) {
case "compact":
accumuloClient.tableOperations().compact(MetadataTable.NAME, null, null, true, true);
accumuloClient.tableOperations().compact(RootTable.NAME, null, null, true, true);
break;
case "flush":
accumuloClient.tableOperations().flush(MetadataTable.NAME, null, null, true);
accumuloClient.tableOperations().flush(RootTable.NAME, null, null, true);
break;
default:
log.trace("'none - no action' or invalid value provided: {}", action);
}
final long actionComplete = System.nanoTime();
gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart);
log.info("gc post action {} completed in {} seconds", action, String.format("%.2f",
(TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 1000.0)));
} catch (Exception e) {
TraceUtil.setException(outerSpan, e, false);
log.warn("{}", e.getMessage(), e);
}
} catch (Exception e) {
TraceUtil.setException(outerSpan, e, true);
throw e;
} finally {
outerSpan.end();
}
try {
gcCycleMetrics.incrementRunCycleCount();
long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
log.debug("Sleeping for {} milliseconds", gcDelay);
Thread.sleep(gcDelay);
} catch (InterruptedException e) {
log.warn("{}", e.getMessage(), e);
return;
}
}
}
private void incrementStatsForRun(GCRun gcRun) {
status.current.candidates += gcRun.getCandidatesStat();
status.current.inUse += gcRun.getInUseStat();
status.current.deleted += gcRun.getDeletedStat();
status.current.errors += gcRun.getErrorsStat();
}
private void logStats() {
log.info("Number of data file candidates for deletion: {}", status.current.candidates);
log.info("Number of data file candidates still in use: {}", status.current.inUse);
log.info("Number of successfully deleted data files: {}", status.current.deleted);
log.info("Number of data files delete failures: {}", status.current.errors);
log.info("Number of bulk imports in progress: {}", status.current.bulks);
}
/**
* Moves a file to trash. If this garbage collector is not using trash, this method returns false
* and leaves the file alone. If the file is missing, this method returns false as opposed to
* throwing an exception.
*
* @return true if the file was moved to trash
* @throws IOException if the volume manager encountered a problem
*/
boolean moveToTrash(Path path) throws IOException {
final VolumeManager fs = getContext().getVolumeManager();
if (!isUsingTrash()) {
return false;
}
try {
return fs.moveToTrash(path);
} catch (FileNotFoundException ex) {
return false;
}
}
private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException {
var path = ServiceLock.path(getContext().getZooKeeperRoot() + Constants.ZGC_LOCK);
LockWatcher lockWatcher = new LockWatcher() {
@Override
public void lostLock(LockLossReason reason) {
Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1);
}
@Override
public void unableToMonitorLockNode(final Exception e) {
// ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility
Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock node ", e));
}
};
UUID zooLockUUID = UUID.randomUUID();
while (true) {
ServiceLock lock =
new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID);
if (lock.tryLock(lockWatcher,
new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes(UTF_8))) {
log.debug("Got GC ZooKeeper lock");
return;
}
log.debug("Failed to get GC ZooKeeper lock, will retry");
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
private HostAndPort startStatsService() {
var processor = ThriftProcessorTypes.getGcTProcessor(this, getContext());
IntStream port = getConfiguration().getPortStream(Property.GC_PORT);
HostAndPort[] addresses = TServerUtils.getHostAndPorts(getHostname(), port);
long maxMessageSize = getConfiguration().getAsBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
ServerAddress server = TServerUtils.startTServer(getConfiguration(),
getContext().getThriftServerType(), processor, this.getClass().getSimpleName(),
"GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize,
getContext().getServerSslParams(), getContext().getSaslParams(), 0,
getConfiguration().getCount(Property.RPC_BACKLOG), getContext().getMetricsInfo(),
addresses);
log.debug("Starting garbage collector listening on " + server.address);
return server.address;
}
/**
* Checks if the given string is a directory.
*
* @param delete possible directory
* @return true if string is a directory
*/
static boolean isDir(String delete) {
if (delete == null) {
return false;
}
int slashCount = 0;
for (int i = 0; i < delete.length(); i++) {
if (delete.charAt(i) == '/') {
slashCount++;
}
}
return slashCount == 1;
}
@Override
public GCStatus getStatus(TInfo info, TCredentials credentials) {
return status;
}
public GcCycleMetrics getGcCycleMetrics() {
return gcCycleMetrics;
}
}