blob: 30bac63c270701af8ba4bb993cbbacdf8ad29c27 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.tserver.tablet;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
import org.apache.accumulo.core.client.impl.DurabilityImpl;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.ConfigurationObserver;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.constraints.Violations;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.data.thrift.MapFileInfo;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.YieldCallback;
import org.apache.accumulo.core.iterators.YieldingKeyValueIterator;
import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
import org.apache.accumulo.core.master.thrift.BulkImportState;
import org.apache.accumulo.core.master.thrift.TabletLoadState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.trace.ProbabilitySampler;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.accumulo.server.fs.VolumeUtil;
import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.tableOps.UserCompactionConfig;
import org.apache.accumulo.server.metrics.Metrics;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.accumulo.server.tablets.UniqueNameAllocator;
import org.apache.accumulo.server.util.FileUtil;
import org.apache.accumulo.server.util.MasterMetadataUtil;
import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.server.util.ReplicationTableUtil;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker;
import org.apache.accumulo.tserver.InMemoryMap;
import org.apache.accumulo.tserver.MinorCompactionReason;
import org.apache.accumulo.tserver.TConstraintViolationException;
import org.apache.accumulo.tserver.TLevel;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
import org.apache.accumulo.tserver.TabletStatsKeeper;
import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
import org.apache.accumulo.tserver.TooManyFilesException;
import org.apache.accumulo.tserver.TservConstraintEnv;
import org.apache.accumulo.tserver.compaction.CompactionPlan;
import org.apache.accumulo.tserver.compaction.CompactionStrategy;
import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
import org.apache.accumulo.tserver.compaction.WriteParameters;
import org.apache.accumulo.tserver.constraints.ConstraintChecker;
import org.apache.accumulo.tserver.log.DfsLogger;
import org.apache.accumulo.tserver.log.MutationReceiver;
import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
import org.apache.accumulo.tserver.tablet.Compactor.CompactionCanceledException;
import org.apache.accumulo.tserver.tablet.Compactor.CompactionEnv;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
/**
*
* Provide access to a single row range in a living TabletServer.
*
*/
public class Tablet implements TabletCommitter {
static private final Logger log = Logger.getLogger(Tablet.class);
private final TabletServer tabletServer;
private final KeyExtent extent;
private final TabletResourceManager tabletResources;
private final DatafileManager datafileManager;
private final TableConfiguration tableConfiguration;
private final String tabletDirectory;
private final Path location; // absolute path of this tablets dir
private final TabletMemory tabletMemory;
private final TabletTime tabletTime;
private final Object timeLock = new Object();
private long persistedTime;
private TServerInstance lastLocation = null;
private volatile boolean tableDirChecked = false;
private final AtomicLong dataSourceDeletions = new AtomicLong(0);
public long getDataSourceDeletions() {
return dataSourceDeletions.get();
}
private final Set<ScanDataSource> activeScans = new HashSet<>();
private static enum CloseState {
OPEN, CLOSING, CLOSED, COMPLETE
}
private volatile CloseState closeState = CloseState.OPEN;
private boolean updatingFlushID = false;
private long lastFlushID = -1;
private long lastCompactID = -1;
private static class CompactionWaitInfo {
long flushID = -1;
long compactionID = -1;
}
// stores info about user initiated major compaction that is waiting on a minor compaction to
// finish
private final CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo();
static enum CompactionState {
WAITING_TO_START, IN_PROGRESS
}
private volatile CompactionState minorCompactionState = null;
private volatile CompactionState majorCompactionState = null;
private final Set<MajorCompactionReason> majorCompactionQueued =
Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class));
private final AtomicReference<ConstraintChecker> constraintChecker = new AtomicReference<>();
private int writesInProgress = 0;
private final TabletStatsKeeper timer = new TabletStatsKeeper();
private final Rate queryRate = new Rate(0.95);
private long queryCount = 0;
private final Rate queryByteRate = new Rate(0.95);
private long queryBytes = 0;
private final Rate ingestRate = new Rate(0.95);
private long ingestCount = 0;
private final Rate ingestByteRate = new Rate(0.95);
private long ingestBytes = 0;
private byte[] defaultSecurityLabel = new byte[0];
private long lastMinorCompactionFinishTime = 0;
private long lastMapFileImportTime = 0;
private volatile long numEntries = 0;
private volatile long numEntriesInMemory = 0;
private final Rate scannedRate = new Rate(0.95);
private final AtomicLong scannedCount = new AtomicLong(0);
private final ConfigurationObserver configObserver;
// Files that are currently in the process of bulk importing. Access to this is protected by the
// tablet lock.
private final Set<FileRef> bulkImporting = new HashSet<>();
// Files that were successfully bulk imported.
private final Cache<Long,List<FileRef>> bulkImported = CacheBuilder.newBuilder().build();
private final int logId;
@Override
public int getLogId() {
return logId;
}
public static class LookupResult {
public List<Range> unfinishedRanges = new ArrayList<>();
public long bytesAdded = 0;
public long dataSize = 0;
public boolean closed = false;
}
FileRef getNextMapFilename(String prefix) throws IOException {
String extension = FileOperations.getNewFileExtension(tableConfiguration);
checkTabletDir();
return new FileRef(location.toString() + "/" + prefix
+ UniqueNameAllocator.getInstance().getNextName() + "." + extension);
}
private void checkTabletDir() throws IOException {
if (!tableDirChecked) {
FileStatus[] files = null;
try {
files = getTabletServer().getFileSystem().listStatus(location);
} catch (FileNotFoundException ex) {
// ignored
}
if (files == null) {
if (location.getName().startsWith(Constants.CLONE_PREFIX))
log.debug("Tablet " + extent + " had no dir, creating " + location); // its a clone dir...
else
log.warn("Tablet " + extent + " had no dir, creating " + location);
getTabletServer().getFileSystem().mkdirs(location);
}
tableDirChecked = true;
}
}
/**
* Only visible for testing
*/
@VisibleForTesting
protected Tablet(TabletTime tabletTime, String tabletDirectory, int logId, Path location,
DatafileManager datafileManager, TabletServer tabletServer,
TabletResourceManager tabletResources, TabletMemory tabletMemory,
TableConfiguration tableConfiguration, KeyExtent extent,
ConfigurationObserver configObserver) {
this.tabletTime = tabletTime;
this.tabletDirectory = tabletDirectory;
this.logId = logId;
this.location = location;
this.datafileManager = datafileManager;
this.tabletServer = tabletServer;
this.tabletResources = tabletResources;
this.tabletMemory = tabletMemory;
this.tableConfiguration = tableConfiguration;
this.extent = extent;
this.configObserver = configObserver;
this.splitCreationTime = 0;
}
public Tablet(final TabletServer tabletServer, final KeyExtent extent,
final TabletResourceManager trm, TabletData data) throws IOException {
this.tabletServer = tabletServer;
this.extent = extent;
this.tabletResources = trm;
this.lastLocation = data.getLastLocation();
this.lastFlushID = data.getFlushID();
this.lastCompactID = data.getCompactID();
this.splitCreationTime = data.getSplitTime();
this.tabletTime = TabletTime.getInstance(data.getTime());
this.persistedTime = tabletTime.getTime();
this.logId = tabletServer.createLogId();
TableConfiguration tblConf = tabletServer.getTableConfiguration(extent);
if (null == tblConf) {
Tables.clearCache(tabletServer.getInstance());
tblConf = tabletServer.getTableConfiguration(extent);
requireNonNull(tblConf, "Could not get table configuration for " + extent.getTableId());
}
this.tableConfiguration = tblConf;
// translate any volume changes
VolumeManager fs = tabletServer.getFileSystem();
boolean replicationEnabled =
ReplicationConfigurationUtil.isEnabled(extent, this.tableConfiguration);
TabletFiles tabletPaths =
new TabletFiles(data.getDirectory(), data.getLogEntris(), data.getDataFiles());
tabletPaths = VolumeUtil.updateTabletVolumes(tabletServer, tabletServer.getLock(), fs, extent,
tabletPaths, replicationEnabled);
// deal with relative path for the directory
Path locationPath;
if (tabletPaths.dir.contains(":")) {
locationPath = new Path(tabletPaths.dir);
} else {
locationPath = tabletServer.getFileSystem().getFullPath(FileType.TABLE,
extent.getTableId() + tabletPaths.dir);
}
this.location = locationPath;
this.tabletDirectory = tabletPaths.dir;
for (Entry<Long,List<FileRef>> entry : data.getBulkImported().entrySet()) {
this.bulkImported.put(entry.getKey(), new CopyOnWriteArrayList<>(entry.getValue()));
}
setupDefaultSecurityLabels(extent);
final List<LogEntry> logEntries = tabletPaths.logEntries;
final SortedMap<FileRef,DataFileValue> datafiles = tabletPaths.datafiles;
tableConfiguration.addObserver(configObserver = new ConfigurationObserver() {
private void reloadConstraints() {
log.debug("Reloading constraints for extent: " + extent);
constraintChecker.set(new ConstraintChecker(tableConfiguration));
}
@Override
public void propertiesChanged() {
reloadConstraints();
try {
setupDefaultSecurityLabels(extent);
} catch (Exception e) {
log.error("Failed to reload default security labels for extent: " + extent.toString());
}
}
@Override
public void propertyChanged(String prop) {
if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey()))
reloadConstraints();
else if (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) {
try {
log.info("Default security labels changed for extent: " + extent.toString());
setupDefaultSecurityLabels(extent);
} catch (Exception e) {
log.error("Failed to reload default security labels for extent: " + extent.toString());
}
}
}
@Override
public void sessionExpired() {
log.trace("Session expired, no longer updating per table props...");
}
});
tableConfiguration.getNamespaceConfiguration().addObserver(configObserver);
tabletMemory = new TabletMemory(this);
// Force a load of any per-table properties
configObserver.propertiesChanged();
if (!logEntries.isEmpty()) {
log.info("Starting Write-Ahead Log recovery for " + this.extent);
final AtomicLong entriesUsedOnTablet = new AtomicLong(0);
// track max time from walog entries without timestamps
final AtomicLong maxTime = new AtomicLong(Long.MIN_VALUE);
final CommitSession commitSession = getTabletMemory().getCommitSession();
try {
Set<String> absPaths = new HashSet<>();
for (FileRef ref : datafiles.keySet())
absPaths.add(ref.path().toString());
tabletServer.recover(this.getTabletServer().getFileSystem(), extent, tableConfiguration,
logEntries, absPaths, new MutationReceiver() {
@Override
public void receive(Mutation m) {
// LogReader.printMutation(m);
Collection<ColumnUpdate> muts = m.getUpdates();
for (ColumnUpdate columnUpdate : muts) {
if (!columnUpdate.hasTimestamp()) {
// if it is not a user set timestamp, it must have been set
// by the system
maxTime.set(Math.max(maxTime.get(), columnUpdate.getTimestamp()));
}
}
getTabletMemory().mutate(commitSession, Collections.singletonList(m));
entriesUsedOnTablet.incrementAndGet();
}
});
if (maxTime.get() != Long.MIN_VALUE) {
tabletTime.useMaxTimeFromWALog(maxTime.get());
}
commitSession.updateMaxCommittedTime(tabletTime.getTime());
if (entriesUsedOnTablet.get() == 0) {
log.debug("No replayed mutations applied, removing unused entries for " + extent);
MetadataTableUtil.removeUnusedWALEntries(getTabletServer(), extent, logEntries,
tabletServer.getLock());
// No replication update to be made because the fact that this tablet didn't use any
// mutations
// from the WAL implies nothing about use of this WAL by other tablets. Do nothing.
logEntries.clear();
} else if (ReplicationConfigurationUtil.isEnabled(extent,
tabletServer.getTableConfiguration(extent))) {
// The logs are about to be re-used by this tablet, we need to record that they have data
// for this extent,
// but that they may get more data. logEntries is not cleared which will cause the
// elements
// in logEntries to be added to the currentLogs for this Tablet below.
//
// This update serves the same purpose as an update during a MinC. We know that the WAL
// was defined
// (written when the WAL was opened) but this lets us know there are mutations written to
// this WAL
// that could potentially be replicated. Because the Tablet is using this WAL, we can be
// sure that
// the WAL isn't closed (WRT replication Status) and thus we're safe to update its
// progress.
Status status = StatusUtil.openWithUnknownLength();
for (LogEntry logEntry : logEntries) {
log.debug("Writing updated status to metadata table for " + logEntry.filename + " "
+ ProtobufUtil.toString(status));
ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.filename, status);
}
}
} catch (Throwable t) {
String msg = "Error recovering tablet " + extent + " from log files";
if (tableConfiguration.getBoolean(Property.TABLE_FAILURES_IGNORE)) {
log.warn(msg, t);
} else {
throw new RuntimeException(msg, t);
}
}
// make some closed references that represent the recovered logs
currentLogs = new HashSet<>();
for (LogEntry logEntry : logEntries) {
currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.filename,
logEntry.getColumnQualifier().toString()));
}
rebuildReferencedLogs();
log.info(
"Write-Ahead Log recovery complete for " + this.extent + " (" + entriesUsedOnTablet.get()
+ " mutations applied, " + getTabletMemory().getNumEntries() + " entries created)");
}
String contextName = tableConfiguration.get(Property.TABLE_CLASSPATH);
if (contextName != null && !contextName.equals("")) {
// initialize context classloader, instead of possibly waiting for it to initialize for a scan
// TODO this could hang, causing other tablets to fail to load - ACCUMULO-1292
AccumuloVFSClassLoader.getContextManager().getClassLoader(contextName);
}
// do this last after tablet is completely setup because it
// could cause major compaction to start
datafileManager = new DatafileManager(this, datafiles);
computeNumEntries();
getDatafileManager().removeFilesAfterScan(data.getScanFiles());
// look for hints of a failure on the previous tablet server
if (!logEntries.isEmpty() || needsMajorCompaction(MajorCompactionReason.NORMAL)) {
// look for any temp files hanging around
removeOldTemporaryFiles();
}
log.log(TLevel.TABLET_HIST, extent + " opened");
}
private void removeOldTemporaryFiles() {
// remove any temporary files created by a previous tablet server
try {
for (FileStatus tmp : getTabletServer().getFileSystem()
.globStatus(new Path(location, "*_tmp"))) {
try {
log.debug("Removing old temp file " + tmp.getPath());
getTabletServer().getFileSystem().delete(tmp.getPath());
} catch (IOException ex) {
log.error("Unable to remove old temp file " + tmp.getPath() + ": " + ex);
}
}
} catch (IOException ex) {
log.error("Error scanning for old temp files in " + location);
}
}
private void setupDefaultSecurityLabels(KeyExtent extent) {
if (extent.isMeta()) {
defaultSecurityLabel = new byte[0];
} else {
try {
ColumnVisibility cv = new ColumnVisibility(
tableConfiguration.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
this.defaultSecurityLabel = cv.getExpression();
} catch (Exception e) {
log.error(e, e);
this.defaultSecurityLabel = new byte[0];
}
}
}
private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges,
HashSet<Column> columnSet, List<KVEntry> results, long maxResultsSize, long batchTimeOut)
throws IOException {
LookupResult lookupResult = new LookupResult();
boolean exceededMemoryUsage = false;
boolean tabletClosed = false;
Set<ByteSequence> cfset = null;
if (columnSet.size() > 0)
cfset = LocalityGroupUtil.families(columnSet);
long timeToRun = TimeUnit.MILLISECONDS.toNanos(batchTimeOut);
long startNanos = System.nanoTime();
if (batchTimeOut <= 0 || batchTimeOut == Long.MAX_VALUE) {
batchTimeOut = 0;
}
// determine if the iterator supported yielding
YieldCallback<Key> yield = new YieldCallback<>();
if (mmfi instanceof YieldingKeyValueIterator)
((YieldingKeyValueIterator<Key,Value>) mmfi).enableYielding(yield);
boolean yielded = false;
for (Range range : ranges) {
boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > timeToRun;
if (exceededMemoryUsage || tabletClosed || timesUp || yielded) {
lookupResult.unfinishedRanges.add(range);
continue;
}
int entriesAdded = 0;
try {
if (cfset != null)
mmfi.seek(range, cfset, true);
else
mmfi.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
while (mmfi.hasTop()) {
if (yield.hasYielded()) {
throw new IOException("Coding error: hasTop returned true but has yielded at "
+ yield.getPositionAndReset());
}
Key key = mmfi.getTopKey();
KVEntry kve = new KVEntry(key, mmfi.getTopValue());
results.add(kve);
entriesAdded++;
lookupResult.bytesAdded += kve.estimateMemoryUsed();
lookupResult.dataSize += kve.numBytes();
exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize;
timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > timeToRun;
if (exceededMemoryUsage || timesUp) {
addUnfinishedRange(lookupResult, range, key, false);
break;
}
mmfi.next();
}
if (yield.hasYielded()) {
yielded = true;
Key yieldPosition = yield.getPositionAndReset();
if (!range.contains(yieldPosition)) {
throw new IOException("Underlying iterator yielded to a position outside of its range: "
+ yieldPosition + " not in " + range);
}
if (!results.isEmpty()
&& yieldPosition.compareTo(results.get(results.size() - 1).getKey()) <= 0) {
throw new IOException("Underlying iterator yielded to a position"
+ " that does not follow the last key returned: " + yieldPosition + " <= "
+ results.get(results.size() - 1).getKey());
}
addUnfinishedRange(lookupResult, range, yieldPosition, false);
log.debug("Scan yield detected at position " + yieldPosition);
Metrics scanMetrics = getTabletServer().getScanMetrics();
if (scanMetrics.isEnabled())
scanMetrics.add(TabletServerScanMetrics.YIELD, 1);
}
} catch (TooManyFilesException tmfe) {
// treat this as a closed tablet, and let the client retry
log.warn("Tablet " + getExtent() + " has too many files, batch lookup can not run");
handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range,
entriesAdded);
tabletClosed = true;
} catch (IOException ioe) {
if (shutdownInProgress()) {
// assume HDFS shutdown hook caused this exception
log.debug("IOException while shutdown in progress ", ioe);
handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range,
entriesAdded);
tabletClosed = true;
} else {
throw ioe;
}
} catch (IterationInterruptedException iie) {
if (isClosed()) {
handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range,
entriesAdded);
tabletClosed = true;
} else {
throw iie;
}
} catch (TabletClosedException tce) {
handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range,
entriesAdded);
tabletClosed = true;
}
}
return lookupResult;
}
private void handleTabletClosedDuringScan(List<KVEntry> results, LookupResult lookupResult,
boolean exceededMemoryUsage, Range range, int entriesAdded) {
if (exceededMemoryUsage)
throw new IllegalStateException(
"Tablet " + extent + "should not exceed memory usage or close, not both");
if (entriesAdded > 0)
addUnfinishedRange(lookupResult, range, results.get(results.size() - 1).getKey(), false);
else
lookupResult.unfinishedRanges.add(range);
lookupResult.closed = true;
}
private void addUnfinishedRange(LookupResult lookupResult, Range range, Key key,
boolean inclusiveStartKey) {
if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) {
Range nlur =
new Range(new Key(key), inclusiveStartKey, range.getEndKey(), range.isEndKeyInclusive());
lookupResult.unfinishedRanges.add(nlur);
}
}
public void checkConditions(ConditionChecker checker, Authorizations authorizations,
AtomicBoolean iFlag) throws IOException {
ScanDataSource dataSource =
new ScanDataSource(this, authorizations, this.defaultSecurityLabel, iFlag);
try {
SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource);
checker.check(iter);
} catch (IOException ioe) {
dataSource.close(true);
throw ioe;
} finally {
// code in finally block because always want
// to return mapfiles, even when exception is thrown
dataSource.close(false);
}
}
public LookupResult lookup(List<Range> ranges, HashSet<Column> columns,
Authorizations authorizations, List<KVEntry> results, long maxResultSize,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag,
SamplerConfiguration samplerConfig, long batchTimeOut, String classLoaderContext)
throws IOException {
if (ranges.size() == 0) {
return new LookupResult();
}
ranges = Range.mergeOverlapping(ranges);
if (ranges.size() > 1) {
Collections.sort(ranges);
}
Range tabletRange = extent.toDataRange();
for (Range range : ranges) {
// do a test to see if this range falls within the tablet, if it does not
// then clip will throw an exception
tabletRange.clip(range);
}
ScanDataSource dataSource = new ScanDataSource(this, authorizations, this.defaultSecurityLabel,
columns, ssiList, ssio, interruptFlag, samplerConfig, batchTimeOut, classLoaderContext);
LookupResult result = null;
try {
SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource);
result = lookup(iter, ranges, columns, results, maxResultSize, batchTimeOut);
return result;
} catch (IOException ioe) {
dataSource.close(true);
throw ioe;
} finally {
// code in finally block because always want
// to return mapfiles, even when exception is thrown
dataSource.close(false);
synchronized (this) {
queryCount += results.size();
if (result != null)
queryBytes += result.dataSize;
}
}
}
Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns,
long batchTimeOut, boolean isolated) throws IOException {
// log.info("In nextBatch..");
long timeToRun = TimeUnit.MILLISECONDS.toNanos(batchTimeOut);
long startNanos = System.nanoTime();
if (batchTimeOut == Long.MAX_VALUE || batchTimeOut <= 0) {
batchTimeOut = 0;
}
List<KVEntry> results = new ArrayList<>();
Key key = null;
Value value;
long resultSize = 0L;
long resultBytes = 0L;
long maxResultsSize = tableConfiguration.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
Key continueKey = null;
boolean skipContinueKey = false;
YieldCallback<Key> yield = new YieldCallback<>();
// we cannot yield if we are in isolation mode
if (!isolated) {
if (iter instanceof YieldingKeyValueIterator)
((YieldingKeyValueIterator<Key,Value>) iter).enableYielding(yield);
}
if (columns.size() == 0) {
iter.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
} else {
iter.seek(range, LocalityGroupUtil.families(columns), true);
}
while (iter.hasTop()) {
if (yield.hasYielded()) {
throw new IOException(
"Coding error: hasTop returned true but has yielded at " + yield.getPositionAndReset());
}
value = iter.getTopValue();
key = iter.getTopKey();
KVEntry kvEntry = new KVEntry(key, value); // copies key and value
results.add(kvEntry);
resultSize += kvEntry.estimateMemoryUsed();
resultBytes += kvEntry.numBytes();
boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >= timeToRun;
if (resultSize >= maxResultsSize || results.size() >= num || timesUp) {
continueKey = new Key(key);
skipContinueKey = true;
break;
}
iter.next();
}
if (yield.hasYielded()) {
continueKey = new Key(yield.getPositionAndReset());
skipContinueKey = true;
if (!range.contains(continueKey)) {
throw new IOException("Underlying iterator yielded to a position outside of its range: "
+ continueKey + " not in " + range);
}
if (!results.isEmpty()
&& continueKey.compareTo(results.get(results.size() - 1).getKey()) <= 0) {
throw new IOException(
"Underlying iterator yielded to a position that does not follow the last key returned: "
+ continueKey + " <= " + results.get(results.size() - 1).getKey());
}
log.debug("Scan yield detected at position " + continueKey);
Metrics scanMetrics = getTabletServer().getScanMetrics();
if (scanMetrics.isEnabled())
scanMetrics.add(TabletServerScanMetrics.YIELD, 1);
} else if (iter.hasTop() == false) {
// end of tablet has been reached
continueKey = null;
if (results.size() == 0)
results = null;
}
return new Batch(skipContinueKey, results, continueKey, resultBytes);
}
/**
* Determine if a JVM shutdown is in progress.
*
*/
boolean shutdownInProgress() {
try {
Runtime.getRuntime().removeShutdownHook(new Thread(new Runnable() {
@Override
public void run() {}
}));
} catch (IllegalStateException ise) {
return true;
}
return false;
}
public Scanner createScanner(Range range, int num, Set<Column> columns,
Authorizations authorizations, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
boolean isolated, AtomicBoolean interruptFlag, SamplerConfiguration samplerConfig,
long batchTimeOut, String classLoaderContext) {
// do a test to see if this range falls within the tablet, if it does not
// then clip will throw an exception
extent.toDataRange().clip(range);
ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns,
ssiList, ssio, interruptFlag, isolated, samplerConfig, batchTimeOut, classLoaderContext);
return new Scanner(this, range, opts);
}
DataFileValue minorCompact(VolumeManager fs, InMemoryMap memTable, FileRef tmpDatafile,
FileRef newDatafile, FileRef mergeFile, boolean hasQueueTime, long queued,
CommitSession commitSession, long flushId, MinorCompactionReason mincReason) {
boolean failed = false;
long start = System.currentTimeMillis();
timer.incrementStatusMinor();
long count = 0;
String oldName = Thread.currentThread().getName();
try {
Thread.currentThread().setName("Minor compacting " + this.extent);
Span span = Trace.start("write");
CompactionStats stats;
try {
count = memTable.getNumEntries();
DataFileValue dfv = null;
if (mergeFile != null)
dfv = getDatafileManager().getDatafileSizes().get(mergeFile);
MinorCompactor compactor = new MinorCompactor(tabletServer, this, memTable, mergeFile, dfv,
tmpDatafile, mincReason, tableConfiguration);
stats = compactor.call();
} finally {
span.stop();
}
span = Trace.start("bringOnline");
try {
getDatafileManager().bringMinorCompactionOnline(tmpDatafile, newDatafile, mergeFile,
new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), commitSession,
flushId);
} finally {
span.stop();
}
return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten());
} catch (Exception e) {
failed = true;
throw new RuntimeException("Exception occurred during minor compaction on " + extent, e);
} catch (Error e) {
// Weird errors like "OutOfMemoryError" when trying to create the thread for the compaction
failed = true;
throw new RuntimeException("Exception occurred during minor compaction on " + extent, e);
} finally {
Thread.currentThread().setName(oldName);
try {
getTabletMemory().finalizeMinC();
} catch (Throwable t) {
log.error("Failed to free tablet memory", t);
}
if (!failed) {
lastMinorCompactionFinishTime = System.currentTimeMillis();
}
Metrics minCMetrics = getTabletServer().getMinCMetrics();
if (minCMetrics.isEnabled())
minCMetrics.add(TabletServerMinCMetrics.MINC, (lastMinorCompactionFinishTime - start));
if (hasQueueTime) {
timer.updateTime(Operation.MINOR, queued, start, count, failed);
if (minCMetrics.isEnabled())
minCMetrics.add(TabletServerMinCMetrics.QUEUE, (start - queued));
} else
timer.updateTime(Operation.MINOR, start, count, failed);
}
}
private synchronized MinorCompactionTask prepareForMinC(long flushId,
MinorCompactionReason mincReason) {
Preconditions.checkState(otherLogs.isEmpty());
Preconditions.checkState(referencedLogs.equals(currentLogs));
CommitSession oldCommitSession = getTabletMemory().prepareForMinC();
otherLogs = currentLogs;
currentLogs = new HashSet<>();
FileRef mergeFile = null;
if (mincReason != MinorCompactionReason.RECOVERY) {
mergeFile = getDatafileManager().reserveMergingMinorCompactionFile();
}
double tracePercent =
tabletServer.getConfiguration().getFraction(Property.TSERV_MINC_TRACE_PERCENT);
return new MinorCompactionTask(this, mergeFile, oldCommitSession, flushId, mincReason,
tracePercent);
}
public void flush(long tableFlushID) {
boolean updateMetadata = false;
boolean initiateMinor = false;
try {
synchronized (this) {
// only want one thing at a time to update flush ID to ensure that metadata table and tablet
// in memory state are consistent
if (updatingFlushID)
return;
if (lastFlushID >= tableFlushID)
return;
if (isClosing() || isClosed() || getTabletMemory().memoryReservedForMinC())
return;
if (getTabletMemory().getMemTable().getNumEntries() == 0) {
lastFlushID = tableFlushID;
updatingFlushID = true;
updateMetadata = true;
} else
initiateMinor = true;
}
if (updateMetadata) {
// if multiple threads were allowed to update this outside of a sync block, then it would be
// a race condition
MetadataTableUtil.updateTabletFlushID(extent, tableFlushID, tabletServer,
getTabletServer().getLock());
} else if (initiateMinor)
initiateMinorCompaction(tableFlushID, MinorCompactionReason.USER);
} finally {
if (updateMetadata) {
synchronized (this) {
updatingFlushID = false;
this.notifyAll();
}
}
}
}
public boolean initiateMinorCompaction(MinorCompactionReason mincReason) {
if (isClosed()) {
// don't bother trying to get flush id if closed... could be closed after this check but that
// is ok... just trying to cut down on uneeded log messages....
return false;
}
// get the flush id before the new memmap is made available for write
long flushId;
try {
flushId = getFlushID();
} catch (NoNodeException e) {
log.info("Asked to initiate MinC when there was no flush id " + getExtent() + " "
+ e.getMessage());
return false;
}
return initiateMinorCompaction(flushId, mincReason);
}
public boolean minorCompactNow(MinorCompactionReason mincReason) {
long flushId;
try {
flushId = getFlushID();
} catch (NoNodeException e) {
log.info("Asked to initiate MinC when there was no flush id " + getExtent() + " "
+ e.getMessage());
return false;
}
MinorCompactionTask mct = createMinorCompactionTask(flushId, mincReason);
if (mct == null)
return false;
mct.run();
return true;
}
boolean initiateMinorCompaction(long flushId, MinorCompactionReason mincReason) {
MinorCompactionTask mct = createMinorCompactionTask(flushId, mincReason);
if (mct == null)
return false;
getTabletResources().executeMinorCompaction(mct);
return true;
}
private MinorCompactionTask createMinorCompactionTask(long flushId,
MinorCompactionReason mincReason) {
MinorCompactionTask mct;
long t1, t2;
StringBuilder logMessage = null;
try {
synchronized (this) {
t1 = System.currentTimeMillis();
if (isClosing() || isClosed() || majorCompactionState == CompactionState.WAITING_TO_START
|| getTabletMemory().memoryReservedForMinC()
|| getTabletMemory().getMemTable().getNumEntries() == 0 || updatingFlushID) {
logMessage = new StringBuilder();
logMessage.append(extent.toString());
logMessage.append(" closeState " + closeState);
logMessage.append(" majorCompactionState " + majorCompactionState);
if (getTabletMemory() != null)
logMessage.append(" tabletMemory.memoryReservedForMinC() "
+ getTabletMemory().memoryReservedForMinC());
if (getTabletMemory() != null && getTabletMemory().getMemTable() != null)
logMessage.append(" tabletMemory.getMemTable().getNumEntries() "
+ getTabletMemory().getMemTable().getNumEntries());
logMessage.append(" updatingFlushID " + updatingFlushID);
return null;
}
mct = prepareForMinC(flushId, mincReason);
t2 = System.currentTimeMillis();
}
} finally {
// log outside of sync block
if (logMessage != null && log.isDebugEnabled())
log.debug(logMessage);
}
log.debug(String.format("MinC initiate lock %.2f secs", (t2 - t1) / 1000.0));
return mct;
}
public long getFlushID() throws NoNodeException {
try {
String zTablePath = Constants.ZROOT + "/" + tabletServer.getInstance().getInstanceID()
+ Constants.ZTABLES + "/" + extent.getTableId() + Constants.ZTABLE_FLUSH_ID;
return Long
.parseLong(new String(ZooReaderWriter.getInstance().getData(zTablePath, null), UTF_8));
} catch (InterruptedException e) {
throw new RuntimeException("Exception on " + extent + " getting flush ID", e);
} catch (NumberFormatException nfe) {
throw new RuntimeException("Exception on " + extent + " getting flush ID", nfe);
} catch (KeeperException ke) {
if (ke instanceof NoNodeException) {
throw (NoNodeException) ke;
} else {
throw new RuntimeException("Exception on " + extent + " getting flush ID", ke);
}
}
}
long getCompactionCancelID() {
String zTablePath = Constants.ZROOT + "/" + tabletServer.getInstance().getInstanceID()
+ Constants.ZTABLES + "/" + extent.getTableId() + Constants.ZTABLE_COMPACT_CANCEL_ID;
try {
return Long
.parseLong(new String(ZooReaderWriter.getInstance().getData(zTablePath, null), UTF_8));
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public Pair<Long,UserCompactionConfig> getCompactionID() throws NoNodeException {
try {
String zTablePath = Constants.ZROOT + "/" + tabletServer.getInstance().getInstanceID()
+ Constants.ZTABLES + "/" + extent.getTableId() + Constants.ZTABLE_COMPACT_ID;
String[] tokens =
new String(ZooReaderWriter.getInstance().getData(zTablePath, null), UTF_8).split(",");
long compactID = Long.parseLong(tokens[0]);
UserCompactionConfig compactionConfig = new UserCompactionConfig();
if (tokens.length > 1) {
Hex hex = new Hex();
ByteArrayInputStream bais =
new ByteArrayInputStream(hex.decode(tokens[1].split("=")[1].getBytes(UTF_8)));
DataInputStream dis = new DataInputStream(bais);
try {
compactionConfig.readFields(dis);
} catch (IOException e) {
throw new RuntimeException(e);
}
KeyExtent ke = new KeyExtent(extent.getTableId(), compactionConfig.getEndRow(),
compactionConfig.getStartRow());
if (!ke.overlaps(extent)) {
// only use iterators if compaction range overlaps
compactionConfig = new UserCompactionConfig();
}
}
return new Pair<>(compactID, compactionConfig);
} catch (InterruptedException e) {
throw new RuntimeException("Exception on " + extent + " getting compaction ID", e);
} catch (NumberFormatException nfe) {
throw new RuntimeException("Exception on " + extent + " getting compaction ID", nfe);
} catch (KeeperException ke) {
if (ke instanceof NoNodeException) {
throw (NoNodeException) ke;
} else {
throw new RuntimeException("Exception on " + extent + " getting compaction ID", ke);
}
} catch (DecoderException e) {
throw new RuntimeException("Exception on " + extent + " getting compaction ID", e);
}
}
private synchronized CommitSession finishPreparingMutations(long time) {
if (writesInProgress < 0) {
throw new IllegalStateException("FATAL: Something really bad went wrong. Attempted to "
+ "increment a negative number of writes in progress " + writesInProgress + "on tablet "
+ extent);
}
if (isClosed() || getTabletMemory() == null) {
return null;
}
writesInProgress++;
CommitSession commitSession = getTabletMemory().getCommitSession();
commitSession.incrementCommitsInProgress();
commitSession.updateMaxCommittedTime(time);
return commitSession;
}
public void checkConstraints() {
ConstraintChecker cc = constraintChecker.get();
if (cc.classLoaderChanged()) {
ConstraintChecker ncc = new ConstraintChecker(tableConfiguration);
constraintChecker.compareAndSet(cc, ncc);
}
}
public CommitSession prepareMutationsForCommit(TservConstraintEnv cenv, List<Mutation> mutations)
throws TConstraintViolationException {
ConstraintChecker cc = constraintChecker.get();
List<Mutation> violators = null;
Violations violations = new Violations();
cenv.setExtent(extent);
for (Mutation mutation : mutations) {
Violations more = cc.check(cenv, mutation);
if (more != null) {
violations.add(more);
if (violators == null)
violators = new ArrayList<>();
violators.add(mutation);
}
}
long time = tabletTime.setUpdateTimes(mutations);
if (!violations.isEmpty()) {
HashSet<Mutation> violatorsSet = new HashSet<>(violators);
ArrayList<Mutation> nonViolators = new ArrayList<>();
for (Mutation mutation : mutations) {
if (!violatorsSet.contains(mutation)) {
nonViolators.add(mutation);
}
}
CommitSession commitSession = null;
if (nonViolators.size() > 0) {
// if everything is a violation, then it is expected that
// code calling this will not log or commit
commitSession = finishPreparingMutations(time);
if (commitSession == null)
return null;
}
throw new TConstraintViolationException(violations, violators, nonViolators, commitSession);
}
return finishPreparingMutations(time);
}
@Override
public synchronized void abortCommit(CommitSession commitSession, List<Mutation> value) {
if (writesInProgress <= 0) {
throw new IllegalStateException("waitingForLogs <= 0 " + writesInProgress);
}
if (isCloseComplete() || getTabletMemory() == null) {
throw new IllegalStateException("aborting commit when tablet is closed");
}
commitSession.decrementCommitsInProgress();
writesInProgress--;
if (writesInProgress == 0)
this.notifyAll();
}
@Override
public void commit(CommitSession commitSession, List<Mutation> mutations) {
int totalCount = 0;
long totalBytes = 0;
// write the mutation to the in memory table
for (Mutation mutation : mutations) {
totalCount += mutation.size();
totalBytes += mutation.numBytes();
}
getTabletMemory().mutate(commitSession, mutations);
synchronized (this) {
if (writesInProgress < 1) {
throw new IllegalStateException("FATAL: Something really bad went wrong. Attempted to "
+ "decrement the number of writes in progress " + writesInProgress
+ " to < 0 on tablet " + extent);
}
if (isCloseComplete()) {
throw new IllegalStateException(
"Tablet " + extent + " closed with outstanding messages to the logger");
}
getTabletMemory().updateMemoryUsageStats();
// decrement here in case an exception is thrown below
writesInProgress--;
if (writesInProgress == 0)
this.notifyAll();
commitSession.decrementCommitsInProgress();
numEntries += totalCount;
numEntriesInMemory += totalCount;
ingestCount += totalCount;
ingestBytes += totalBytes;
}
}
/**
* Closes the mapfiles associated with a Tablet. If saveState is true, a minor compaction is
* performed.
*/
public void close(boolean saveState) throws IOException {
initiateClose(saveState, false, false);
completeClose(saveState, true);
}
void initiateClose(boolean saveState, boolean queueMinC, boolean disableWrites) {
if (!saveState && queueMinC) {
throw new IllegalArgumentException("Bad state initiating close on " + extent
+ ". State not saved and requested minor compactions queue");
}
log.debug("initiateClose(saveState=" + saveState + " queueMinC=" + queueMinC + " disableWrites="
+ disableWrites + ") " + getExtent());
MinorCompactionTask mct = null;
synchronized (this) {
if (isClosed() || isClosing()) {
String msg = "Tablet " + getExtent() + " already " + closeState;
throw new IllegalStateException(msg);
}
// enter the closing state, no splits, minor, or major compactions can start
// should cause running major compactions to stop
closeState = CloseState.CLOSING;
this.notifyAll();
// determines if inserts and queries can still continue while minor compacting
if (disableWrites) {
closeState = CloseState.CLOSED;
}
// wait for major compactions to finish, setting closing to
// true should cause any running major compactions to abort
while (isMajorCompactionRunning()) {
try {
this.wait(50);
} catch (InterruptedException e) {
log.error(e.toString());
}
}
while (updatingFlushID) {
try {
this.wait(50);
} catch (InterruptedException e) {
log.error(e.toString());
}
}
if (!saveState || getTabletMemory().getMemTable().getNumEntries() == 0) {
return;
}
getTabletMemory().waitForMinC();
try {
mct = prepareForMinC(getFlushID(), MinorCompactionReason.CLOSE);
} catch (NoNodeException e) {
throw new RuntimeException("Exception on " + extent + " during prep for MinC", e);
}
if (queueMinC) {
getTabletResources().executeMinorCompaction(mct);
return;
}
}
// do minor compaction outside of synch block so that tablet can be read and written to while
// compaction runs
mct.run();
}
private boolean closeCompleting = false;
synchronized void completeClose(boolean saveState, boolean completeClose) throws IOException {
if (!isClosing() || isCloseComplete() || closeCompleting) {
throw new IllegalStateException("Bad close state " + closeState + " on tablet " + extent);
}
log.debug("completeClose(saveState=" + saveState + " completeClose=" + completeClose + ") "
+ getExtent());
// ensure this method is only called once, also guards against multiple
// threads entering the method at the same time
closeCompleting = true;
closeState = CloseState.CLOSED;
// modify dataSourceDeletions so scans will try to switch data sources and fail because the
// tablet is closed
dataSourceDeletions.incrementAndGet();
for (ScanDataSource activeScan : activeScans) {
activeScan.interrupt();
}
// wait for reads and writes to complete
while (writesInProgress > 0 || activeScans.size() > 0) {
try {
this.wait(50);
} catch (InterruptedException e) {
log.error(e.toString());
}
}
getTabletMemory().waitForMinC();
if (saveState && getTabletMemory().getMemTable().getNumEntries() > 0) {
try {
prepareForMinC(getFlushID(), MinorCompactionReason.CLOSE).run();
} catch (NoNodeException e) {
throw new RuntimeException("Exception on " + extent + " during prep for MinC", e);
}
}
if (saveState) {
// at this point all tablet data is flushed, so do a consistency check
RuntimeException err = null;
for (int i = 0; i < 5; i++) {
try {
closeConsistencyCheck();
err = null;
} catch (RuntimeException t) {
err = t;
log.error("Consistency check fails, retrying " + t);
sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
}
}
if (err != null) {
ProblemReports.getInstance(tabletServer).report(new ProblemReport(extent.getTableId(),
ProblemType.TABLET_LOAD, this.extent.toString(), err));
log.error("Tablet closed consistency check has failed for " + this.extent
+ " giving up and closing");
}
}
try {
getTabletMemory().getMemTable().delete(0);
} catch (Throwable t) {
log.error("Failed to delete mem table : " + t.getMessage() + " for tablet " + extent, t);
}
getTabletMemory().close();
// close map files
getTabletResources().close();
log.log(TLevel.TABLET_HIST, extent + " closed");
tableConfiguration.getNamespaceConfiguration().removeObserver(configObserver);
tableConfiguration.removeObserver(configObserver);
if (completeClose)
closeState = CloseState.COMPLETE;
}
private void closeConsistencyCheck() {
if (getTabletMemory().getMemTable().getNumEntries() != 0) {
String msg = "Closed tablet " + extent + " has "
+ getTabletMemory().getMemTable().getNumEntries() + " entries in memory";
log.error(msg);
throw new RuntimeException(msg);
}
if (getTabletMemory().memoryReservedForMinC()) {
String msg = "Closed tablet " + extent + " has minor compacting memory";
log.error(msg);
throw new RuntimeException(msg);
}
try {
Pair<List<LogEntry>,SortedMap<FileRef,DataFileValue>> fileLog =
MetadataTableUtil.getFileAndLogEntries(tabletServer, extent);
if (fileLog.getFirst().size() != 0) {
String msg = "Closed tablet " + extent + " has walog entries in " + MetadataTable.NAME + " "
+ fileLog.getFirst();
log.error(msg);
throw new RuntimeException(msg);
}
if (extent.isRootTablet()) {
if (!fileLog.getSecond().keySet()
.equals(getDatafileManager().getDatafileSizes().keySet())) {
String msg = "Data file in " + RootTable.NAME + " differ from in memory data " + extent
+ " " + fileLog.getSecond().keySet() + " "
+ getDatafileManager().getDatafileSizes().keySet();
log.error(msg);
throw new RuntimeException(msg);
}
} else {
if (!fileLog.getSecond().equals(getDatafileManager().getDatafileSizes())) {
String msg =
"Data file in " + MetadataTable.NAME + " differ from in memory data " + extent + " "
+ fileLog.getSecond() + " " + getDatafileManager().getDatafileSizes();
log.error(msg);
throw new RuntimeException(msg);
}
}
} catch (Exception e) {
String msg = "Failed to do close consistency check for tablet " + extent;
log.error(msg, e);
throw new RuntimeException(msg, e);
}
if (otherLogs.size() != 0 || currentLogs.size() != 0 || referencedLogs.size() != 0) {
String msg = "Closed tablet " + extent + " has walog entries in memory currentLogs = "
+ currentLogs + " otherLogs = " + otherLogs + " refererncedLogs = " + referencedLogs;
log.error(msg);
throw new RuntimeException(msg);
}
// TODO check lastFlushID and lostCompactID - ACCUMULO-1290
}
/**
* Returns a Path object representing the tablet's location on the DFS.
*
* @return location
*/
public Path getLocation() {
return location;
}
public synchronized boolean initiateMajorCompaction(MajorCompactionReason reason) {
if (isClosing() || isClosed() || !needsMajorCompaction(reason) || isMajorCompactionRunning()
|| majorCompactionQueued.contains(reason)) {
return false;
}
majorCompactionQueued.add(reason);
getTabletResources().executeMajorCompaction(getExtent(), new CompactionRunner(this, reason));
return false;
}
/**
* Returns true if a major compaction should be performed on the tablet.
*
*/
public boolean needsMajorCompaction(MajorCompactionReason reason) {
if (isMajorCompactionRunning())
return false;
if (reason == MajorCompactionReason.CHOP || reason == MajorCompactionReason.USER)
return true;
return getTabletResources().needsMajorCompaction(getDatafileManager().getDatafileSizes(),
reason);
}
/**
* Returns an int representing the total block size of the files served by this tablet.
*
* @return size
*/
// this is the size of just the files
public long estimateTabletSize() {
long size = 0L;
for (DataFileValue sz : getDatafileManager().getDatafileSizes().values())
size += sz.getSize();
return size;
}
private boolean sawBigRow = false;
private long timeOfLastMinCWhenBigFreakinRowWasSeen = 0;
private long timeOfLastImportWhenBigFreakinRowWasSeen = 0;
private final long splitCreationTime;
private SplitRowSpec findSplitRow(Collection<FileRef> files) {
// never split the root tablet
// check if we already decided that we can never split
// check to see if we're big enough to split
long splitThreshold = tableConfiguration.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD);
long maxEndRow = tableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE);
if (extent.isRootTablet() || estimateTabletSize() <= splitThreshold) {
return null;
}
// have seen a big row before, do not bother checking unless a minor compaction or map file
// import has occurred.
if (sawBigRow) {
if (timeOfLastMinCWhenBigFreakinRowWasSeen != lastMinorCompactionFinishTime
|| timeOfLastImportWhenBigFreakinRowWasSeen != lastMapFileImportTime) {
// a minor compaction or map file import has occurred... check again
sawBigRow = false;
} else {
// nothing changed, do not split
return null;
}
}
SortedMap<Double,Key> keys = null;
try {
// we should make .25 below configurable
keys = FileUtil.findMidPoint(getTabletServer().getFileSystem(),
getTabletServer().getConfiguration(), extent.getPrevEndRow(), extent.getEndRow(),
FileUtil.toPathStrings(files), .25);
} catch (IOException e) {
log.error("Failed to find midpoint " + e.getMessage());
return null;
}
if (keys.isEmpty()) {
log.info("Cannot split tablet " + extent + ", files contain no data for tablet.");
// set the following to keep tablet from attempting to split until the tablets set of files
// changes.
sawBigRow = true;
timeOfLastMinCWhenBigFreakinRowWasSeen = lastMinorCompactionFinishTime;
timeOfLastImportWhenBigFreakinRowWasSeen = lastMapFileImportTime;
return null;
}
// check to see if one row takes up most of the tablet, in which case we can not split
try {
Text lastRow;
if (extent.getEndRow() == null) {
Key lastKey = (Key) FileUtil.findLastKey(getTabletServer().getFileSystem(),
getTabletServer().getConfiguration(), files);
lastRow = lastKey.getRow();
} else {
lastRow = extent.getEndRow();
}
// We expect to get a midPoint for this set of files. If we don't get one, we have a problem.
final Key mid = keys.get(.5);
if (mid == null) {
throw new IllegalStateException("Could not determine midpoint for files on " + extent);
}
// check to see that the midPoint is not equal to the end key
if (mid.compareRow(lastRow) == 0) {
if (keys.firstKey() < .5) {
Key candidate = keys.get(keys.firstKey());
if (candidate.getLength() > maxEndRow) {
log.warn("Cannot split tablet " + extent
+ ", selected split point too long. Length : " + candidate.getLength());
sawBigRow = true;
timeOfLastMinCWhenBigFreakinRowWasSeen = lastMinorCompactionFinishTime;
timeOfLastImportWhenBigFreakinRowWasSeen = lastMapFileImportTime;
return null;
}
if (candidate.compareRow(lastRow) != 0) {
// we should use this ratio in split size estimations
if (log.isTraceEnabled())
log.trace(
String.format("Splitting at %6.2f instead of .5, row at .5 is same as end row%n",
keys.firstKey()));
return new SplitRowSpec(keys.firstKey(), candidate.getRow());
}
}
log.warn("Cannot split tablet " + extent + " it contains a big row : " + lastRow);
sawBigRow = true;
timeOfLastMinCWhenBigFreakinRowWasSeen = lastMinorCompactionFinishTime;
timeOfLastImportWhenBigFreakinRowWasSeen = lastMapFileImportTime;
return null;
}
Text text = mid.getRow();
SortedMap<Double,Key> firstHalf = keys.headMap(.5);
if (firstHalf.size() > 0) {
Text beforeMid = firstHalf.get(firstHalf.lastKey()).getRow();
Text shorter = new Text();
int trunc = longestCommonLength(text, beforeMid);
shorter.set(text.getBytes(), 0, Math.min(text.getLength(), trunc + 1));
text = shorter;
}
if (text.getLength() > maxEndRow) {
log.warn("Cannot split tablet " + extent + ", selected split point too long. Length : "
+ text.getLength());
sawBigRow = true;
timeOfLastMinCWhenBigFreakinRowWasSeen = lastMinorCompactionFinishTime;
timeOfLastImportWhenBigFreakinRowWasSeen = lastMapFileImportTime;
return null;
}
return new SplitRowSpec(.5, text);
} catch (IOException e) {
// don't split now, but check again later
log.error("Failed to find lastkey " + e.getMessage());
return null;
}
}
private static int longestCommonLength(Text text, Text beforeMid) {
int common = 0;
while (common < text.getLength() && common < beforeMid.getLength()
&& text.getBytes()[common] == beforeMid.getBytes()[common]) {
common++;
}
return common;
}
private Map<FileRef,Pair<Key,Key>> getFirstAndLastKeys(SortedMap<FileRef,DataFileValue> allFiles)
throws IOException {
Map<FileRef,Pair<Key,Key>> result = new HashMap<>();
FileOperations fileFactory = FileOperations.getInstance();
VolumeManager fs = getTabletServer().getFileSystem();
for (Entry<FileRef,DataFileValue> entry : allFiles.entrySet()) {
FileRef file = entry.getKey();
FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
FileSKVIterator openReader =
fileFactory.newReaderBuilder().forFile(file.path().toString(), ns, ns.getConf())
.withTableConfiguration(this.getTableConfiguration()).seekToBeginning().build();
try {
Key first = openReader.getFirstKey();
Key last = openReader.getLastKey();
result.put(file, new Pair<>(first, last));
} finally {
openReader.close();
}
}
return result;
}
List<FileRef> findChopFiles(KeyExtent extent, Map<FileRef,Pair<Key,Key>> firstAndLastKeys,
Collection<FileRef> allFiles) throws IOException {
List<FileRef> result = new ArrayList<>();
if (firstAndLastKeys == null) {
result.addAll(allFiles);
return result;
}
for (FileRef file : allFiles) {
Pair<Key,Key> pair = firstAndLastKeys.get(file);
if (pair == null) {
// file was created or imported after we obtained the first and last keys... there
// are a few options here... throw an exception which will cause the compaction to
// retry and also cause ugly error message that the admin has to ignore... could
// go get the first and last key, but this code is called while the tablet lock
// is held... or just compact the file....
result.add(file);
} else {
Key first = pair.getFirst();
Key last = pair.getSecond();
// If first and last are null, it's an empty file. Add it to the compact set so it goes
// away.
if ((first == null && last == null) || (first != null && !extent.contains(first.getRow()))
|| (last != null && !extent.contains(last.getRow()))) {
result.add(file);
}
}
}
return result;
}
/**
* Returns true if this tablet needs to be split
*
*/
public synchronized boolean needsSplit() {
if (isClosing() || isClosed())
return false;
return findSplitRow(getDatafileManager().getFiles()) != null;
}
// BEGIN PRIVATE METHODS RELATED TO MAJOR COMPACTION
private CompactionStats _majorCompact(MajorCompactionReason reason)
throws IOException, CompactionCanceledException {
long t1, t2, t3;
Pair<Long,UserCompactionConfig> compactionId = null;
CompactionStrategy strategy = null;
Map<FileRef,Pair<Key,Key>> firstAndLastKeys = null;
if (reason == MajorCompactionReason.USER) {
try {
compactionId = getCompactionID();
strategy = createCompactionStrategy(compactionId.getSecond().getCompactionStrategy());
} catch (NoNodeException e) {
throw new RuntimeException("Exception on " + extent + " during MajC", e);
}
} else if (reason == MajorCompactionReason.NORMAL || reason == MajorCompactionReason.IDLE) {
strategy = Property.createTableInstanceFromPropertyName(tableConfiguration,
Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class,
new DefaultCompactionStrategy());
strategy.init(Property.getCompactionStrategyOptions(tableConfiguration));
} else if (reason == MajorCompactionReason.CHOP) {
firstAndLastKeys = getFirstAndLastKeys(getDatafileManager().getDatafileSizes());
} else {
throw new IllegalArgumentException(
"Unknown compaction reason " + reason + " during MajC on " + extent);
}
if (strategy != null) {
MajorCompactionRequest request = new MajorCompactionRequest(extent, reason,
getTabletServer().getFileSystem(), tableConfiguration);
request.setFiles(getDatafileManager().getDatafileSizes());
strategy.gatherInformation(request);
}
Map<FileRef,DataFileValue> filesToCompact = null;
int maxFilesToCompact = tableConfiguration.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN);
CompactionStats majCStats = new CompactionStats();
CompactionPlan plan = null;
boolean propogateDeletes = false;
boolean updateCompactionID = false;
synchronized (this) {
// plan all that work that needs to be done in the sync block... then do the actual work
// outside the sync block
t1 = System.currentTimeMillis();
majorCompactionState = CompactionState.WAITING_TO_START;
getTabletMemory().waitForMinC();
t2 = System.currentTimeMillis();
majorCompactionState = CompactionState.IN_PROGRESS;
notifyAll();
VolumeManager fs = getTabletServer().getFileSystem();
if (extent.isRootTablet()) {
// very important that we call this before doing major compaction,
// otherwise deleted compacted files could possible be brought back
// at some point if the file they were compacted to was legitimately
// removed by a major compaction
RootFiles.cleanupReplacement(fs, fs.listStatus(this.location), false);
}
SortedMap<FileRef,DataFileValue> allFiles = getDatafileManager().getDatafileSizes();
List<FileRef> inputFiles = new ArrayList<>();
if (reason == MajorCompactionReason.CHOP) {
// enforce rules: files with keys outside our range need to be compacted
inputFiles.addAll(findChopFiles(extent, firstAndLastKeys, allFiles.keySet()));
} else {
MajorCompactionRequest request =
new MajorCompactionRequest(extent, reason, fs, tableConfiguration);
request.setFiles(allFiles);
plan = strategy.getCompactionPlan(request);
if (plan != null) {
plan.validate(allFiles.keySet());
inputFiles.addAll(plan.inputFiles);
}
}
if (inputFiles.isEmpty()) {
if (reason == MajorCompactionReason.USER) {
if (compactionId.getSecond().getIterators().isEmpty()) {
log.debug(
"No-op major compaction by USER on 0 input files because no iterators present.");
lastCompactID = compactionId.getFirst();
updateCompactionID = true;
} else {
log.debug("Major compaction by USER on 0 input files with iterators.");
filesToCompact = new HashMap<>();
}
} else {
return majCStats;
}
} else {
// If no original files will exist at the end of the compaction, we do not have to propogate
// deletes
Set<FileRef> droppedFiles = new HashSet<>();
droppedFiles.addAll(inputFiles);
if (plan != null)
droppedFiles.addAll(plan.deleteFiles);
propogateDeletes = !(droppedFiles.equals(allFiles.keySet()));
log.debug("Major compaction plan: " + plan + " propogate deletes : " + propogateDeletes);
filesToCompact = new HashMap<>(allFiles);
filesToCompact.keySet().retainAll(inputFiles);
getDatafileManager().reserveMajorCompactingFiles(filesToCompact.keySet());
}
t3 = System.currentTimeMillis();
}
try {
log.debug(String.format("MajC initiate lock %.2f secs, wait %.2f secs", (t3 - t2) / 1000.0,
(t2 - t1) / 1000.0));
if (updateCompactionID) {
MetadataTableUtil.updateTabletCompactID(extent, compactionId.getFirst(), tabletServer,
getTabletServer().getLock());
return majCStats;
}
if (!propogateDeletes && compactionId == null) {
// compacting everything, so update the compaction id in metadata
try {
compactionId = getCompactionID();
if (compactionId.getSecond().getCompactionStrategy() != null) {
compactionId = null;
// TODO maybe return unless chop?
}
} catch (NoNodeException e) {
throw new RuntimeException("Exception on " + extent + " during MajC", e);
}
}
List<IteratorSetting> compactionIterators = new ArrayList<>();
if (compactionId != null) {
if (reason == MajorCompactionReason.USER) {
if (getCompactionCancelID() >= compactionId.getFirst()) {
// compaction was canceled
return majCStats;
}
compactionIterators = compactionId.getSecond().getIterators();
synchronized (this) {
if (lastCompactID >= compactionId.getFirst())
// already compacted
return majCStats;
}
}
}
// need to handle case where only one file is being major compacted
// ACCUMULO-3645 run loop at least once, even if filesToCompact.isEmpty()
do {
int numToCompact = maxFilesToCompact;
if (filesToCompact.size() > maxFilesToCompact
&& filesToCompact.size() < 2 * maxFilesToCompact) {
// on the second to last compaction pass, compact the minimum amount of files possible
numToCompact = filesToCompact.size() - maxFilesToCompact + 1;
}
Set<FileRef> smallestFiles = removeSmallest(filesToCompact, numToCompact);
FileRef fileName =
getNextMapFilename((filesToCompact.size() == 0 && !propogateDeletes) ? "A" : "C");
FileRef compactTmpName = new FileRef(fileName.path().toString() + "_tmp");
AccumuloConfiguration tableConf = createTableConfiguration(tableConfiguration, plan);
Span span = Trace.start("compactFiles");
try {
CompactionEnv cenv = new CompactionEnv() {
@Override
public boolean isCompactionEnabled() {
// avoid calling isClosing() because its synchronized and this is called frequently in
// compaction
return closeState != CloseState.CLOSING;
}
@Override
public IteratorScope getIteratorScope() {
return IteratorScope.majc;
}
@Override
public RateLimiter getReadLimiter() {
return getTabletServer().getMajorCompactionReadLimiter();
}
@Override
public RateLimiter getWriteLimiter() {
return getTabletServer().getMajorCompactionWriteLimiter();
}
};
HashMap<FileRef,DataFileValue> copy =
new HashMap<>(getDatafileManager().getDatafileSizes());
if (!copy.keySet().containsAll(smallestFiles))
throw new IllegalStateException("Cannot find data file values for " + smallestFiles
+ " on " + extent + " during MajC");
copy.keySet().retainAll(smallestFiles);
log.debug("Starting MajC " + extent + " (" + reason + ") " + copy.keySet() + " --> "
+ compactTmpName + " " + compactionIterators);
// always propagate deletes, unless last batch
boolean lastBatch = filesToCompact.isEmpty();
Compactor compactor = new Compactor(tabletServer, this, copy, null, compactTmpName,
lastBatch ? propogateDeletes : true, cenv, compactionIterators, reason.ordinal(),
tableConf);
CompactionStats mcs = compactor.call();
span.data("files", "" + smallestFiles.size());
span.data("read", "" + mcs.getEntriesRead());
span.data("written", "" + mcs.getEntriesWritten());
majCStats.add(mcs);
if (lastBatch && plan != null && plan.deleteFiles != null) {
smallestFiles.addAll(plan.deleteFiles);
}
getDatafileManager().bringMajorCompactionOnline(smallestFiles, compactTmpName, fileName,
filesToCompact.size() == 0 && compactionId != null ? compactionId.getFirst() : null,
new DataFileValue(mcs.getFileSize(), mcs.getEntriesWritten()));
// when major compaction produces a file w/ zero entries, it will be deleted... do not
// want
// to add the deleted file
if (filesToCompact.size() > 0 && mcs.getEntriesWritten() > 0) {
filesToCompact.put(fileName,
new DataFileValue(mcs.getFileSize(), mcs.getEntriesWritten()));
}
} finally {
span.stop();
}
} while (filesToCompact.size() > 0);
return majCStats;
} finally {
synchronized (Tablet.this) {
getDatafileManager().clearMajorCompactingFile();
}
}
}
protected AccumuloConfiguration createTableConfiguration(TableConfiguration base,
CompactionPlan plan) {
if (plan == null || plan.writeParameters == null)
return base;
WriteParameters p = plan.writeParameters;
ConfigurationCopy result = new ConfigurationCopy(base);
if (p.getHdfsBlockSize() > 0)
result.set(Property.TABLE_FILE_BLOCK_SIZE, "" + p.getHdfsBlockSize());
if (p.getBlockSize() > 0)
result.set(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, "" + p.getBlockSize());
if (p.getIndexBlockSize() > 0)
result.set(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, "" + p.getIndexBlockSize());
if (p.getCompressType() != null)
result.set(Property.TABLE_FILE_COMPRESSION_TYPE, p.getCompressType());
if (p.getReplication() != 0)
result.set(Property.TABLE_FILE_REPLICATION, "" + p.getReplication());
return result;
}
private Set<FileRef> removeSmallest(Map<FileRef,DataFileValue> filesToCompact,
int maxFilesToCompact) {
// ensure this method works properly when multiple files have the same size
// short-circuit; also handles zero files case
if (filesToCompact.size() <= maxFilesToCompact) {
Set<FileRef> smallestFiles = new HashSet<>(filesToCompact.keySet());
filesToCompact.clear();
return smallestFiles;
}
PriorityQueue<Pair<FileRef,Long>> fileHeap =
new PriorityQueue<>(filesToCompact.size(), new Comparator<Pair<FileRef,Long>>() {
@Override
public int compare(Pair<FileRef,Long> o1, Pair<FileRef,Long> o2) {
if (o1.getSecond() == o2.getSecond())
return o1.getFirst().compareTo(o2.getFirst());
if (o1.getSecond() < o2.getSecond())
return -1;
return 1;
}
});
for (Iterator<Entry<FileRef,DataFileValue>> iterator = filesToCompact.entrySet().iterator();
iterator.hasNext();) {
Entry<FileRef,DataFileValue> entry = iterator.next();
fileHeap.add(new Pair<>(entry.getKey(), entry.getValue().getSize()));
}
Set<FileRef> smallestFiles = new HashSet<>();
while (smallestFiles.size() < maxFilesToCompact && fileHeap.size() > 0) {
Pair<FileRef,Long> pair = fileHeap.remove();
filesToCompact.remove(pair.getFirst());
smallestFiles.add(pair.getFirst());
}
return smallestFiles;
}
// END PRIVATE METHODS RELATED TO MAJOR COMPACTION
/**
* Performs a major compaction on the tablet. If needsSplit() returns true, the tablet is split
* and a reference to the new tablet is returned.
*/
CompactionStats majorCompact(MajorCompactionReason reason, long queued) {
CompactionStats majCStats = null;
boolean success = false;
long start = System.currentTimeMillis();
timer.incrementStatusMajor();
synchronized (this) {
// check that compaction is still needed - defer to splitting
majorCompactionQueued.remove(reason);
if (isClosing() || isClosed() || !needsMajorCompaction(reason) || isMajorCompactionRunning()
|| needsSplit()) {
return null;
}
majorCompactionState = CompactionState.WAITING_TO_START;
}
Span span = null;
try {
double tracePercent =
tabletServer.getConfiguration().getFraction(Property.TSERV_MAJC_TRACE_PERCENT);
ProbabilitySampler sampler = new ProbabilitySampler(tracePercent);
span = Trace.on("majorCompaction", sampler);
majCStats = _majorCompact(reason);
if (reason == MajorCompactionReason.CHOP) {
MetadataTableUtil.chopped(getTabletServer(), getExtent(), this.getTabletServer().getLock());
getTabletServer()
.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.CHOPPED, extent));
}
success = true;
} catch (CompactionCanceledException cce) {
log.debug("Major compaction canceled, extent = " + getExtent());
} catch (IOException ioe) {
log.error("MajC Failed, extent = " + getExtent(), ioe);
} catch (RuntimeException e) {
log.error("MajC Unexpected exception, extent = " + getExtent(), e);
} finally {
// ensure we always reset boolean, even
// when an exception is thrown
synchronized (this) {
majorCompactionState = null;
this.notifyAll();
}
if (span != null) {
span.data("extent", "" + getExtent());
if (majCStats != null) {
span.data("read", "" + majCStats.getEntriesRead());
span.data("written", "" + majCStats.getEntriesWritten());
}
span.stop();
}
}
long count = 0;
if (majCStats != null)
count = majCStats.getEntriesRead();
timer.updateTime(Operation.MAJOR, queued, start, count, !success);
return majCStats;
}
@Override
public KeyExtent getExtent() {
return extent;
}
synchronized void computeNumEntries() {
Collection<DataFileValue> vals = getDatafileManager().getDatafileSizes().values();
long numEntries = 0;
for (DataFileValue tableValue : vals) {
numEntries += tableValue.getNumEntries();
}
this.numEntriesInMemory = getTabletMemory().getNumEntries();
numEntries += getTabletMemory().getNumEntries();
this.numEntries = numEntries;
}
public long getNumEntries() {
return numEntries;
}
public long getNumEntriesInMemory() {
return numEntriesInMemory;
}
public synchronized boolean isClosing() {
return closeState == CloseState.CLOSING;
}
public synchronized boolean isClosed() {
return closeState == CloseState.CLOSED || closeState == CloseState.COMPLETE;
}
public synchronized boolean isCloseComplete() {
return closeState == CloseState.COMPLETE;
}
public boolean isMajorCompactionRunning() {
return majorCompactionState != null;
}
public boolean isMinorCompactionQueued() {
return minorCompactionState == CompactionState.WAITING_TO_START;
}
public boolean isMinorCompactionRunning() {
return minorCompactionState == CompactionState.IN_PROGRESS;
}
public boolean isMajorCompactionQueued() {
return majorCompactionQueued.size() > 0;
}
public TreeMap<KeyExtent,TabletData> split(byte[] sp) throws IOException {
if (sp != null && extent.getEndRow() != null && extent.getEndRow().equals(new Text(sp))) {
throw new IllegalArgumentException(
"Attempting to split on EndRow " + extent.getEndRow() + " for " + extent);
}
if (sp != null
&& sp.length > tableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE)) {
String msg = "Cannot split tablet " + extent + ", selected split point too long. Length : "
+ sp.length;
log.warn(msg);
throw new IOException(msg);
}
if (extent.isRootTablet()) {
String msg = "Cannot split root tablet";
log.warn(msg);
throw new RuntimeException(msg);
}
try {
initiateClose(true, false, false);
} catch (IllegalStateException ise) {
log.debug("File " + extent + " not splitting : " + ise.getMessage());
return null;
}
// obtain this info outside of synch block since it will involve opening
// the map files... it is ok if the set of map files changes, because
// this info is used for optimization... it is ok if map files are missing
// from the set... can still query and insert into the tablet while this
// map file operation is happening
Map<FileRef,FileUtil.FileInfo> firstAndLastRows =
FileUtil.tryToGetFirstAndLastRows(getTabletServer().getFileSystem(),
getTabletServer().getConfiguration(), getDatafileManager().getFiles());
synchronized (this) {
// java needs tuples ...
TreeMap<KeyExtent,TabletData> newTablets = new TreeMap<>();
long t1 = System.currentTimeMillis();
// choose a split point
SplitRowSpec splitPoint;
if (sp == null)
splitPoint = findSplitRow(getDatafileManager().getFiles());
else {
Text tsp = new Text(sp);
splitPoint =
new SplitRowSpec(FileUtil.estimatePercentageLTE(getTabletServer().getFileSystem(),
getTabletServer().getConfiguration(), extent.getPrevEndRow(), extent.getEndRow(),
FileUtil.toPathStrings(getDatafileManager().getFiles()), tsp), tsp);
}
if (splitPoint == null || splitPoint.row == null) {
log.info("had to abort split because splitRow was null");
closeState = CloseState.OPEN;
return null;
}
closeState = CloseState.CLOSING;
completeClose(true, false);
Text midRow = splitPoint.row;
double splitRatio = splitPoint.splitRatio;
KeyExtent low = new KeyExtent(extent.getTableId(), midRow, extent.getPrevEndRow());
KeyExtent high = new KeyExtent(extent.getTableId(), extent.getEndRow(), midRow);
String lowDirectory =
createTabletDirectory(getTabletServer().getFileSystem(), extent.getTableId(), midRow);
// write new tablet information to MetadataTable
SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<>();
SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<>();
List<FileRef> highDatafilesToRemove = new ArrayList<>();
MetadataTableUtil.splitDatafiles(extent.getTableId(), midRow, splitRatio, firstAndLastRows,
getDatafileManager().getDatafileSizes(), lowDatafileSizes, highDatafileSizes,
highDatafilesToRemove);
log.debug("Files for low split " + low + " " + lowDatafileSizes.keySet());
log.debug("Files for high split " + high + " " + highDatafileSizes.keySet());
String time = tabletTime.getMetadataValue();
MetadataTableUtil.splitTablet(high, extent.getPrevEndRow(), splitRatio, getTabletServer(),
getTabletServer().getLock());
MasterMetadataUtil.addNewTablet(getTabletServer(), low, lowDirectory,
getTabletServer().getTabletSession(), lowDatafileSizes, getBulkIngestedFiles(), time,
lastFlushID, lastCompactID, getTabletServer().getLock());
MetadataTableUtil.finishSplit(high, highDatafileSizes, highDatafilesToRemove,
getTabletServer(), getTabletServer().getLock());
log.log(TLevel.TABLET_HIST, extent + " split " + low + " " + high);
newTablets.put(high, new TabletData(tabletDirectory, highDatafileSizes, time, lastFlushID,
lastCompactID, lastLocation, getBulkIngestedFiles()));
newTablets.put(low, new TabletData(lowDirectory, lowDatafileSizes, time, lastFlushID,
lastCompactID, lastLocation, getBulkIngestedFiles()));
long t2 = System.currentTimeMillis();
log.debug(String.format("offline split time : %6.2f secs", (t2 - t1) / 1000.0));
closeState = CloseState.COMPLETE;
return newTablets;
}
}
public SortedMap<FileRef,DataFileValue> getDatafiles() {
return getDatafileManager().getDatafileSizes();
}
public double queryRate() {
return queryRate.rate();
}
public double queryByteRate() {
return queryByteRate.rate();
}
public double ingestRate() {
return ingestRate.rate();
}
public double ingestByteRate() {
return ingestByteRate.rate();
}
public double scanRate() {
return scannedRate.rate();
}
public long totalQueries() {
return this.queryCount;
}
public long totalIngest() {
return this.ingestCount;
}
// synchronized?
public void updateRates(long now) {
queryRate.update(now, queryCount);
queryByteRate.update(now, queryBytes);
ingestRate.update(now, ingestCount);
ingestByteRate.update(now, ingestBytes);
scannedRate.update(now, scannedCount.get());
}
public long getSplitCreationTime() {
return splitCreationTime;
}
public void importMapFiles(long tid, Map<FileRef,MapFileInfo> fileMap, boolean setTime)
throws IOException {
Map<FileRef,DataFileValue> entries = new HashMap<>(fileMap.size());
List<String> files = new ArrayList<>();
for (Entry<FileRef,MapFileInfo> entry : fileMap.entrySet()) {
entries.put(entry.getKey(), new DataFileValue(entry.getValue().estimatedSize, 0l));
files.add(entry.getKey().path().toString());
}
// Clients timeout and will think that this operation failed.
// Don't do it if we spent too long waiting for the lock
long now = System.currentTimeMillis();
synchronized (this) {
if (isClosed()) {
throw new IOException("tablet " + extent + " is closed");
}
// TODO check seems uneeded now - ACCUMULO-1291
long lockWait = System.currentTimeMillis() - now;
if (lockWait
> getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)) {
throw new IOException(
"Timeout waiting " + (lockWait / 1000.) + " seconds to get tablet lock");
}
List<FileRef> alreadyImported = bulkImported.getIfPresent(tid);
if (alreadyImported != null) {
for (FileRef entry : alreadyImported) {
if (fileMap.remove(entry) != null) {
log.info("Ignoring import of bulk file already imported: " + entry);
}
}
}
Iterator<FileRef> fiter = fileMap.keySet().iterator();
while (fiter.hasNext()) {
FileRef file = fiter.next();
if (bulkImporting.contains(file)) {
log.info("Ignoring import of bulk file currently importing: " + file);
fiter.remove();
}
}
if (fileMap.isEmpty()) {
return;
}
if (writesInProgress < 0) {
throw new IllegalStateException("FATAL: Something really bad went wrong. Attempted to "
+ "increment a negative number of writes in progress " + writesInProgress + "on tablet "
+ extent);
}
// prevent other threads from processing this file while its added to the metadata table.
bulkImporting.addAll(fileMap.keySet());
writesInProgress++;
}
try {
tabletServer.updateBulkImportState(files, BulkImportState.LOADING);
getDatafileManager().importMapFiles(tid, entries, setTime);
lastMapFileImportTime = System.currentTimeMillis();
if (needsSplit()) {
getTabletServer().executeSplit(this);
} else {
initiateMajorCompaction(MajorCompactionReason.NORMAL);
}
} finally {
synchronized (this) {
if (writesInProgress < 1)
throw new IllegalStateException("FATAL: Something really bad went wrong. Attempted to "
+ "decrement the number of writes in progress " + writesInProgress
+ " to < 0 on tablet " + extent);
writesInProgress--;
if (writesInProgress == 0)
this.notifyAll();
if (!bulkImporting.removeAll(fileMap.keySet())) {
throw new AssertionError(
"Likely bug in code, always expect to remove something. Please open an Accumulo issue.");
}
try {
bulkImported.get(tid, new Callable<List<FileRef>>() {
@Override
public List<FileRef> call() throws Exception {
return new ArrayList<>();
}
}).addAll(fileMap.keySet());
} catch (Exception ex) {
log.info(ex.toString(), ex);
}
tabletServer.removeBulkImportState(files);
}
}
}
private Set<DfsLogger> currentLogs = new HashSet<>();
private Set<DfsLogger> otherLogs = Collections.emptySet();
// An immutable copy of currentLogs + otherLogs. This exists so that removeInUseLogs() does not
// have to get the tablet lock. See #558
private volatile Set<DfsLogger> referencedLogs = Collections.emptySet();
private synchronized void rebuildReferencedLogs() {
/*
* Each tablet has the following sets of WALogs. While a WALog exists in one set, garbage
* collection must be avoided.
*
* 1. WALogs for the active in memory map
*
* 2. WAlogs for the minor compacting in memory map
*
* 3. WAlogs for a newly minor compacted file that is being added to the metadata table.
*
* Set 1 is currentLogs. Set 2 is otherLogs. Set 3 only exist in referenced logs as a side
* effect of not calling this method in beginClearingUnusedLogs() when otherLogs is cleared.
*
* Ensuring referencedLogs accurately tracks these sets ensures in use walogs are not GCed.
*/
Builder<DfsLogger> builder = ImmutableSet.builder();
builder.addAll(currentLogs);
builder.addAll(otherLogs);
referencedLogs = builder.build();
}
public void removeInUseLogs(Set<DfsLogger> candidates) {
candidates.removeAll(referencedLogs);
}
public void checkIfMinorCompactionNeededForLogs(List<DfsLogger> closedLogs) {
// grab this outside of tablet lock.
int maxLogs = tableConfiguration.getCount(Property.TABLE_MINC_LOGS_MAX);
String reason = null;
synchronized (this) {
if (currentLogs.size() >= maxLogs) {
reason = "referenced " + currentLogs.size() + " write ahead logs";
} else if (maxLogs < closedLogs.size()) {
// If many tablets reference a single WAL, but each tablet references a different WAL then
// this could result in the tablet server referencing many WALs. For recovery that would
// mean each tablet had to process lots of WAL. This check looks for a single use of an
// older WAL and compacts if one is found. The following check assumes the most recent WALs
// are at the end of the list and ignores these.
List<DfsLogger> oldClosed = closedLogs.subList(0, closedLogs.size() - maxLogs);
for (DfsLogger closedLog : oldClosed) {
if (currentLogs.contains(closedLog)) {
reason = "referenced at least one old write ahead log " + closedLog.getFileName();
break;
}
}
}
}
if (reason != null) {
// initiate and log outside of tablet lock
initiateMinorCompaction(MinorCompactionReason.SYSTEM);
log.debug("Initiating minor compaction for " + getExtent() + " because " + reason);
}
}
Set<String> beginClearingUnusedLogs() {
Set<String> unusedLogs = new HashSet<>();
ArrayList<String> otherLogsCopy = new ArrayList<>();
ArrayList<String> currentLogsCopy = new ArrayList<>();
// do not hold tablet lock while acquiring the log lock
logLock.lock();
synchronized (this) {
if (removingLogs)
throw new IllegalStateException("Attempted to clear logs when removal of logs in progress");
for (DfsLogger logger : otherLogs) {
otherLogsCopy.add(logger.toString());
unusedLogs.add(logger.getMeta());
}
for (DfsLogger logger : currentLogs) {
currentLogsCopy.add(logger.toString());
unusedLogs.remove(logger.getMeta());
}
otherLogs = Collections.emptySet();
// Intentionally NOT calling rebuildReferenedLogs() here as that could cause GC of in use
// walogs(see #539). The clearing of otherLogs is reflected in refererncedLogs when
// finishClearingUnusedLogs() calls rebuildReferenedLogs(). See the comments in
// rebuildReferenedLogs() for more info.
if (unusedLogs.size() > 0)
removingLogs = true;
}
// do debug logging outside tablet lock
for (String logger : otherLogsCopy) {
log.debug("Logs for memory compacted: " + getExtent() + " " + logger);
}
for (String logger : currentLogsCopy) {
log.debug("Logs for current memory: " + getExtent() + " " + logger);
}
for (String logger : unusedLogs) {
log.debug("Logs to be destroyed: " + getExtent() + " " + logger);
}
return unusedLogs;
}
synchronized void finishClearingUnusedLogs() {
removingLogs = false;
rebuildReferencedLogs();
logLock.unlock();
}
private boolean removingLogs = false;
// this lock is basically used to synchronize writing of log info to metadata
private final ReentrantLock logLock = new ReentrantLock();
// don't release the lock if this method returns true for success; instead, the caller should
// clean up by calling finishUpdatingLogsUsed()
@Override
public boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger more, boolean mincFinish) {
boolean releaseLock = true;
// do not hold tablet lock while acquiring the log lock
logLock.lock();
try {
synchronized (this) {
if (isCloseComplete()) {
throw new IllegalStateException("Can not update logs of closed tablet " + extent);
}
boolean addToOther;
if (memTable == getTabletMemory().getMinCMemTable())
addToOther = true;
else if (memTable == getTabletMemory().getMemTable())
addToOther = false;
else
throw new IllegalArgumentException("Passed in memtable that is not in use for " + extent);
if (mincFinish) {
if (addToOther)
throw new IllegalStateException("Adding to other logs for mincFinish on " + extent);
if (otherLogs.size() != 0)
throw new IllegalStateException("Expect other logs to be 0 when minC finish, but its "
+ otherLogs + " for " + extent);
// when writing a minc finish event, there is no need to add the log to metadata
// if nothing has been logged for the tablet since the minor compaction started
if (currentLogs.size() == 0)
return !releaseLock;
}
int numAdded = 0;
int numContained = 0;
if (addToOther) {
if (otherLogs.add(more))
numAdded++;
if (currentLogs.contains(more))
numContained++;
} else {
if (currentLogs.add(more))
numAdded++;
if (otherLogs.contains(more))
numContained++;
}
if (numAdded > 0) {
rebuildReferencedLogs();
}
if (numAdded > 0 && numAdded != 1) {
// expect to add all or none
throw new IllegalArgumentException(
"Added subset of logs " + extent + " " + more + " " + currentLogs);
}
if (numContained > 0 && numContained != 1) {
// expect to contain all or none
throw new IllegalArgumentException(
"Other logs contained subset of logs " + extent + " " + more + " " + otherLogs);
}
if (numAdded > 0 && numContained == 0) {
releaseLock = false;
}
return !releaseLock;
}
} finally {
if (releaseLock)
logLock.unlock();
}
}
@Override
public void finishUpdatingLogsUsed() {
logLock.unlock();
}
synchronized public void chopFiles() {
initiateMajorCompaction(MajorCompactionReason.CHOP);
}
private CompactionStrategy createCompactionStrategy(CompactionStrategyConfig strategyConfig) {
String context = tableConfiguration.get(Property.TABLE_CLASSPATH);
String clazzName = strategyConfig.getClassName();
try {
Class<? extends CompactionStrategy> clazz;
if (context != null && !context.equals(""))
clazz = AccumuloVFSClassLoader.getContextManager().loadClass(context, clazzName,
CompactionStrategy.class);
else
clazz = AccumuloVFSClassLoader.loadClass(clazzName, CompactionStrategy.class);
CompactionStrategy strategy = clazz.newInstance();
strategy.init(strategyConfig.getOptions());
return strategy;
} catch (Exception e) {
throw new RuntimeException("Error creating compaction strategy on " + extent, e);
}
}
public void compactAll(long compactionId, UserCompactionConfig compactionConfig) {
boolean updateMetadata = false;
synchronized (this) {
if (lastCompactID >= compactionId)
return;
if (isMinorCompactionRunning()) {
// want to wait for running minc to finish before starting majc, see ACCUMULO-3041
if (compactionWaitInfo.compactionID == compactionId) {
if (lastFlushID == compactionWaitInfo.flushID)
return;
} else {
compactionWaitInfo.compactionID = compactionId;
compactionWaitInfo.flushID = lastFlushID;
return;
}
}
if (isClosing() || isClosed() || majorCompactionQueued.contains(MajorCompactionReason.USER)
|| isMajorCompactionRunning())
return;
CompactionStrategyConfig strategyConfig = compactionConfig.getCompactionStrategy();
CompactionStrategy strategy = createCompactionStrategy(strategyConfig);
MajorCompactionRequest request = new MajorCompactionRequest(extent,
MajorCompactionReason.USER, getTabletServer().getFileSystem(), tableConfiguration);
request.setFiles(getDatafileManager().getDatafileSizes());
try {
if (strategy.shouldCompact(request)) {
initiateMajorCompaction(MajorCompactionReason.USER);
} else {
majorCompactionState = CompactionState.IN_PROGRESS;
updateMetadata = true;
lastCompactID = compactionId;
}
} catch (IOException e) {
throw new RuntimeException("IOException on " + extent + " during compact all", e);
}
}
if (updateMetadata) {
try {
// if multiple threads were allowed to update this outside of a sync block, then it would be
// a race condition
MetadataTableUtil.updateTabletCompactID(extent, compactionId, getTabletServer(),
getTabletServer().getLock());
} finally {
synchronized (this) {
majorCompactionState = null;
this.notifyAll();
}
}
}
}
@Override
public TableConfiguration getTableConfiguration() {
return tableConfiguration;
}
@Override
public Durability getDurability() {
return DurabilityImpl.fromString(getTableConfiguration().get(Property.TABLE_DURABILITY));
}
@Override
public void updateMemoryUsageStats(long size, long mincSize) {
getTabletResources().updateMemoryUsageStats(this, size, mincSize);
}
public long incrementDataSourceDeletions() {
return dataSourceDeletions.incrementAndGet();
}
synchronized public void updateQueryStats(int size, long numBytes) {
queryCount += size;
queryBytes += numBytes;
}
TabletServer getTabletServer() {
return tabletServer;
}
public void updatePersistedTime(long bulkTime, Map<FileRef,DataFileValue> paths, long tid) {
synchronized (timeLock) {
if (bulkTime > persistedTime)
persistedTime = bulkTime;
MetadataTableUtil.updateTabletDataFile(tid, extent, paths,
tabletTime.getMetadataValue(persistedTime), getTabletServer(),
getTabletServer().getLock());
}
}
public void updateTabletDataFile(long maxCommittedTime, FileRef newDatafile, FileRef absMergeFile,
DataFileValue dfv, Set<String> unusedWalLogs, Set<FileRef> filesInUseByScans, long flushId) {
synchronized (timeLock) {
if (maxCommittedTime > persistedTime)
persistedTime = maxCommittedTime;
String time = tabletTime.getMetadataValue(persistedTime);
MasterMetadataUtil.updateTabletDataFile(getTabletServer(), extent, newDatafile, absMergeFile,
dfv, time, filesInUseByScans, tabletServer.getClientAddressString(),
tabletServer.getLock(), unusedWalLogs, lastLocation, flushId);
}
}
TabletResourceManager getTabletResources() {
return tabletResources;
}
DatafileManager getDatafileManager() {
return datafileManager;
}
TabletMemory getTabletMemory() {
return tabletMemory;
}
public long getAndUpdateTime() {
return tabletTime.getAndUpdateTime();
}
public void flushComplete(long flushId) {
lastLocation = null;
dataSourceDeletions.incrementAndGet();
tabletMemory.finishedMinC();
lastFlushID = flushId;
computeNumEntries();
}
public TServerInstance resetLastLocation() {
TServerInstance result = lastLocation;
lastLocation = null;
return result;
}
synchronized public void addActiveScans(ScanDataSource scanDataSource) {
activeScans.add(scanDataSource);
}
public int removeScan(ScanDataSource scanDataSource) {
activeScans.remove(scanDataSource);
return activeScans.size();
}
synchronized public void setLastCompactionID(Long compactionId) {
if (compactionId != null)
this.lastCompactID = compactionId;
}
public void removeMajorCompactionQueuedReason(MajorCompactionReason reason) {
majorCompactionQueued.remove(reason);
}
public void minorCompactionWaitingToStart() {
minorCompactionState = CompactionState.WAITING_TO_START;
}
public void minorCompactionStarted() {
minorCompactionState = CompactionState.IN_PROGRESS;
}
public void minorCompactionComplete() {
minorCompactionState = null;
}
public TabletStats getTabletStats() {
return timer.getTabletStats();
}
public AtomicLong getScannedCounter() {
return scannedCount;
}
@SuppressModernizer
private static String createTabletDirectory(VolumeManager fs, String tableId, Text endRow) {
String lowDirectory;
UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
String volume = fs.choose(Optional.of(tableId), ServerConstants.getBaseUris())
+ Constants.HDFS_TABLES_DIR + Path.SEPARATOR;
while (true) {
try {
if (endRow == null) {
lowDirectory = Constants.DEFAULT_TABLET_LOCATION;
Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
if (fs.exists(lowDirectoryPath) || fs.mkdirs(lowDirectoryPath)) {
FileSystem pathFs = fs.getVolumeByPath(lowDirectoryPath).getFileSystem();
return lowDirectoryPath.makeQualified(pathFs.getUri(), pathFs.getWorkingDirectory())
.toString();
}
log.warn("Failed to create " + lowDirectoryPath + " for unknown reason");
} else {
lowDirectory = "/" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName();
Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
if (fs.exists(lowDirectoryPath))
throw new IllegalStateException("Attempting to create tablet dir for tableID " + tableId
+ " and dir exists when it should not: " + lowDirectoryPath);
if (fs.mkdirs(lowDirectoryPath)) {
FileSystem lowDirectoryFs = fs.getVolumeByPath(lowDirectoryPath).getFileSystem();
return lowDirectoryPath
.makeQualified(lowDirectoryFs.getUri(), lowDirectoryFs.getWorkingDirectory())
.toString();
}
}
} catch (IOException e) {
log.warn(e);
}
log.warn("Failed to create dir for tablet in table " + tableId + " in volume " + volume
+ " + will retry ...");
sleepUninterruptibly(3, TimeUnit.SECONDS);
}
}
public Map<Long,List<FileRef>> getBulkIngestedFiles() {
return new HashMap<>(bulkImported.asMap());
}
public void cleanupBulkLoadedFiles(Set<Long> tids) {
for (Long tid : tids) {
bulkImported.invalidate(tid);
}
}
}