blob: 1c26ef15b5b6b5d231d3bd1d6715119f0f3fb121 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tephra.hbase.txprune;
import com.google.common.util.concurrent.AbstractIdleService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
/**
* Thread that will write the the prune upper bound. An instance of this class should be obtained only
* through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of this instance.
*/
public class PruneUpperBoundWriter extends AbstractIdleService {
private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
private final TableName tableName;
private final DataJanitorState dataJanitorState;
private final long pruneFlushInterval;
// Map of region name -> prune upper bound
private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
// Map of region name -> time the region was found to be empty
private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
private volatile Thread flushThread;
private volatile boolean stopped;
private long lastChecked;
@SuppressWarnings("WeakerAccess")
public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
this.tableName = tableName;
this.dataJanitorState = dataJanitorState;
this.pruneFlushInterval = pruneFlushInterval;
this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
}
@SuppressWarnings("WeakerAccess")
public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
warnIfNotRunning(regionName);
// The number of entries in this map is bound by the number of regions in this region server and thus it will not
// grow indefinitely
pruneEntries.put(regionName, pruneUpperBound);
}
@SuppressWarnings("WeakerAccess")
public void persistRegionEmpty(byte[] regionName, long time) {
warnIfNotRunning(regionName);
// The number of entries in this map is bound by the number of regions in this region server and thus it will not
// grow indefinitely
emptyRegions.put(regionName, time);
}
@SuppressWarnings("WeakerAccess")
public boolean isAlive() {
return flushThread != null && flushThread.isAlive();
}
@Override
protected void startUp() throws Exception {
LOG.info("Starting PruneUpperBoundWriter Thread.");
startFlushThread();
}
@Override
protected void shutDown() throws Exception {
LOG.info("Stopping PruneUpperBoundWriter Thread.");
stopped = true;
if (flushThread != null) {
flushThread.interrupt();
flushThread.join(TimeUnit.SECONDS.toMillis(1));
}
}
private void startFlushThread() {
flushThread = new Thread("tephra-prune-upper-bound-writer") {
@Override
public void run() {
while ((!isInterrupted()) && (!stopped)) {
long now = System.currentTimeMillis();
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
// Record prune upper bound
while (!pruneEntries.isEmpty()) {
Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
// We can now remove the entry only if the key and value match with what we wrote since it is
// possible that a new pruneUpperBound for the same key has been added
pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
}
// Record empty regions
while (!emptyRegions.isEmpty()) {
Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
// We can now remove the entry only if the key and value match with what we wrote since it is
// possible that a new value for the same key has been added
emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
}
return null;
}
});
} catch (IOException | InterruptedException ex) {
// Handle any exception that might be thrown during HBase operation
handleException(ex);
}
lastChecked = now;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ex) {
interrupt();
break;
}
}
LOG.info("PruneUpperBound Writer thread terminated.");
}
};
flushThread.setDaemon(true);
flushThread.start();
}
private void warnIfNotRunning(byte[] regionName) {
if (!isRunning() || !isAlive()) {
LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
}
}
private void handleException(Exception ex) {
LOG.warn("Cannot record prune upper bound for a region to table " +
tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}