blob: b2568f7ade39465fd0995090e66cdde59ae68ea7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.tserver.tablet;
import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.logging.TabletLogger;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.util.ManagerMetadataUtil;
import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.server.util.ReplicationTableUtil;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
class DatafileManager {
private final Logger log = LoggerFactory.getLogger(DatafileManager.class);
// access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles
private final Map<StoredTabletFile,DataFileValue> datafileSizes =
Collections.synchronizedMap(new TreeMap<>());
private final Tablet tablet;
// ensure we only have one reader/writer of our bulk file notes at a time
private final Object bulkFileImportLock = new Object();
// This must be incremented before and after datafileSizes and metadata table updates. These
// counts allow detection of overlapping operations w/o placing a lock around metadata table
// updates and datafileSizes updates. There is a periodic metadata consistency check that runs in
// the tablet server against all tablets. This check compares what a tablet object has in memory
// to what is in the metadata table to ensure they are in agreement. Inorder to avoid false
// positives, when this consistency check runs its needs to know if it overlaps in time with any
// metadata updates made by the tablet. The consistency check uses these counts to know that.
private final AtomicReference<MetadataUpdateCount> metadataUpdateCount;
DatafileManager(Tablet tablet, SortedMap<StoredTabletFile,DataFileValue> datafileSizes) {
this.datafileSizes.putAll(datafileSizes);
this.tablet = tablet;
this.metadataUpdateCount =
new AtomicReference<>(new MetadataUpdateCount(tablet.getExtent(), 0L, 0L));
}
private final Set<TabletFile> filesToDeleteAfterScan = new HashSet<>();
private final Map<Long,Set<StoredTabletFile>> scanFileReservations = new HashMap<>();
private final MapCounter<StoredTabletFile> fileScanReferenceCounts = new MapCounter<>();
private long nextScanReservationId = 0;
static void rename(VolumeManager fs, Path src, Path dst) throws IOException {
if (!fs.rename(src, dst)) {
throw new IOException("Rename " + src + " to " + dst + " returned false ");
}
}
Pair<Long,Map<TabletFile,DataFileValue>> reserveFilesForScan() {
synchronized (tablet) {
Set<StoredTabletFile> absFilePaths = new HashSet<>(datafileSizes.keySet());
long rid = nextScanReservationId++;
scanFileReservations.put(rid, absFilePaths);
Map<TabletFile,DataFileValue> ret = new HashMap<>();
for (StoredTabletFile path : absFilePaths) {
fileScanReferenceCounts.increment(path, 1);
ret.put(path, datafileSizes.get(path));
}
return new Pair<>(rid, ret);
}
}
void returnFilesForScan(Long reservationId) {
final Set<StoredTabletFile> filesToDelete = new HashSet<>();
try {
synchronized (tablet) {
Set<StoredTabletFile> absFilePaths = scanFileReservations.remove(reservationId);
if (absFilePaths == null) {
throw new IllegalArgumentException("Unknown scan reservation id " + reservationId);
}
boolean notify = false;
try {
for (StoredTabletFile path : absFilePaths) {
long refCount = fileScanReferenceCounts.decrement(path, 1);
if (refCount == 0) {
if (filesToDeleteAfterScan.remove(path)) {
filesToDelete.add(path);
}
notify = true;
} else if (refCount < 0) {
throw new IllegalStateException("Scan ref count for " + path + " is " + refCount);
}
}
} finally {
if (notify) {
tablet.notifyAll();
}
}
}
} finally {
// Remove scan files even if the loop above did not fully complete because once a
// file is in the set filesToDelete that means it was removed from filesToDeleteAfterScan
// and would never be added back.
if (!filesToDelete.isEmpty()) {
log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), filesToDelete);
MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(),
tablet.getTabletServer().getLock());
}
}
}
void removeFilesAfterScan(Set<StoredTabletFile> scanFiles) {
if (scanFiles.isEmpty()) {
return;
}
Set<StoredTabletFile> filesToDelete = new HashSet<>();
synchronized (tablet) {
for (StoredTabletFile path : scanFiles) {
if (fileScanReferenceCounts.get(path) == 0) {
filesToDelete.add(path);
} else {
filesToDeleteAfterScan.add(path);
}
}
}
if (!filesToDelete.isEmpty()) {
log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), filesToDelete);
MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(),
tablet.getTabletServer().getLock());
}
}
private TreeSet<StoredTabletFile> waitForScansToFinish(Set<StoredTabletFile> pathsToWaitFor) {
long maxWait = 10000L;
long startTime = System.currentTimeMillis();
TreeSet<StoredTabletFile> inUse = new TreeSet<>();
Span span = TraceUtil.startSpan(this.getClass(), "waitForScans");
try (Scope scope = span.makeCurrent()) {
synchronized (tablet) {
for (StoredTabletFile path : pathsToWaitFor) {
while (fileScanReferenceCounts.get(path) > 0
&& System.currentTimeMillis() - startTime < maxWait) {
try {
tablet.wait(100);
} catch (InterruptedException e) {
log.warn("{}", e.getMessage(), e);
}
}
}
for (StoredTabletFile path : pathsToWaitFor) {
if (fileScanReferenceCounts.get(path) > 0) {
inUse.add(path);
}
}
}
} catch (Exception e) {
TraceUtil.setException(span, e, true);
throw e;
} finally {
span.end();
}
return inUse;
}
public Collection<StoredTabletFile> importMapFiles(long tid, Map<TabletFile,DataFileValue> paths,
boolean setTime) throws IOException {
String bulkDir = null;
// once tablet files are inserted into the metadata they will become StoredTabletFiles
Map<StoredTabletFile,DataFileValue> newFiles = new HashMap<>(paths.size());
for (TabletFile tpath : paths.keySet()) {
boolean inTheRightDirectory = false;
Path parent = tpath.getPath().getParent().getParent();
for (String tablesDir : tablet.getContext().getTablesDirs()) {
if (parent.equals(new Path(tablesDir, tablet.getExtent().tableId().canonical()))) {
inTheRightDirectory = true;
break;
}
}
if (!inTheRightDirectory) {
throw new IOException("Data file " + tpath + " not in table dirs");
}
if (bulkDir == null) {
bulkDir = tpath.getTabletDir();
} else if (!bulkDir.equals(tpath.getTabletDir())) {
throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath);
}
}
if (tablet.getExtent().isMeta()) {
throw new IllegalArgumentException("Can not import files to a metadata tablet");
}
// increment start count before metadata update AND updating in memory map of files
metadataUpdateCount.updateAndGet(MetadataUpdateCount::incrementStart);
// do not place any code here between above stmt and try{}finally
try {
synchronized (bulkFileImportLock) {
if (!paths.isEmpty()) {
long bulkTime = Long.MIN_VALUE;
if (setTime) {
for (DataFileValue dfv : paths.values()) {
long nextTime = tablet.getAndUpdateTime();
if (nextTime < bulkTime) {
throw new IllegalStateException(
"Time went backwards unexpectedly " + nextTime + " " + bulkTime);
}
bulkTime = nextTime;
dfv.setTime(bulkTime);
}
}
newFiles = tablet.updatePersistedTime(bulkTime, paths, tid);
}
}
synchronized (tablet) {
for (Entry<StoredTabletFile,DataFileValue> tpath : newFiles.entrySet()) {
if (datafileSizes.containsKey(tpath.getKey())) {
log.error("Adding file that is already in set {}", tpath.getKey());
}
datafileSizes.put(tpath.getKey(), tpath.getValue());
}
tablet.getTabletResources().importedMapFiles();
tablet.computeNumEntries();
}
for (Entry<StoredTabletFile,DataFileValue> entry : newFiles.entrySet()) {
TabletLogger.bulkImported(tablet.getExtent(), entry.getKey());
}
} catch (Exception e) {
// Any exception in this code is prone to leaving the persisted tablet metadata and the
// tablets in memory data structs out of sync. Log the extent and exact files involved as this
// may be useful for debugging.
log.error("Failure adding bulk import files {} {}", tablet.getExtent(), paths.keySet(), e);
throw e;
} finally {
// increment finish count after metadata update AND updating in memory map of files
metadataUpdateCount.updateAndGet(MetadataUpdateCount::incrementFinish);
}
return newFiles.keySet();
}
/**
* Returns Optional of the new file created. It is possible that the file was just flushed with no
* entries so was not inserted into the metadata. In this case empty is returned. If the file was
* stored in the metadata table, then StoredTableFile will be returned.
*/
Optional<StoredTabletFile> bringMinorCompactionOnline(TabletFile tmpDatafile,
TabletFile newDatafile, DataFileValue dfv, CommitSession commitSession, long flushId) {
Optional<StoredTabletFile> newFile;
// rename before putting in metadata table, so files in metadata table should
// always exist
boolean attemptedRename = false;
VolumeManager vm = tablet.getTabletServer().getContext().getVolumeManager();
do {
try {
if (dfv.getNumEntries() == 0) {
log.debug("No data entries so delete temporary file {}", tmpDatafile);
vm.deleteRecursively(tmpDatafile.getPath());
} else {
if (!attemptedRename && vm.exists(newDatafile.getPath())) {
log.warn("Target map file already exist {}", newDatafile);
throw new RuntimeException("File unexpectedly exists " + newDatafile.getPath());
}
// the following checks for spurious rename failures that succeeded but gave an IoE
if (attemptedRename && vm.exists(newDatafile.getPath())
&& !vm.exists(tmpDatafile.getPath())) {
// seems like previous rename succeeded, so break
break;
}
attemptedRename = true;
rename(vm, tmpDatafile.getPath(), newDatafile.getPath());
}
break;
} catch (IOException ioe) {
log.warn("Tablet " + tablet.getExtent() + " failed to rename " + newDatafile
+ " after MinC, will retry in 60 secs...", ioe);
sleepUninterruptibly(1, TimeUnit.MINUTES);
}
} while (true);
long t1, t2;
// increment start count before metadata update AND updating in memory map of files
metadataUpdateCount.updateAndGet(MetadataUpdateCount::incrementStart);
// do not place any code here between above stmt and following try{}finally
try {
// Should not hold the tablet lock while trying to acquire the log lock because this could
// lead to deadlock. However there is a path in the code that does this. See #3759
tablet.getLogLock().lock();
// do not place any code here between lock and try
try {
// The following call pairs with tablet.finishClearingUnusedLogs() later in this block. If
// moving where the following method is called, examine it and finishClearingUnusedLogs()
// before moving.
Set<String> unusedWalLogs = tablet.beginClearingUnusedLogs();
// the order of writing to metadata and walog is important in the face of machine/process
// failures need to write to metadata before writing to walog, when things are done in the
// reverse order data could be lost... the minor compaction start even should be written
// before the following metadata write is made
newFile = tablet.updateTabletDataFile(commitSession.getMaxCommittedTime(), newDatafile, dfv,
unusedWalLogs, flushId);
// Mark that we have data we want to replicate
// This WAL could still be in use by other Tablets *from the same table*, so we can only
// mark
// that there is data to replicate,
// but it is *not* closed. We know it is not closed by the fact that this MinC triggered. A
// MinC cannot happen unless the
// tablet is online and thus these WALs are referenced by that tablet. Therefore, the WAL
// replication status cannot be 'closed'.
@SuppressWarnings("deprecation")
boolean replicate = org.apache.accumulo.core.replication.ReplicationConfigurationUtil
.isEnabled(tablet.getExtent(), tablet.getTableConfiguration());
if (replicate) {
// unusedWalLogs is of the form host/fileURI, need to strip off the host portion
Set<String> logFileOnly = new HashSet<>();
for (String unusedWalLog : unusedWalLogs) {
int index = unusedWalLog.indexOf('/');
if (index == -1) {
log.warn(
"Could not find host component to strip from DFSLogger representation of WAL");
} else {
unusedWalLog = unusedWalLog.substring(index + 1);
}
logFileOnly.add(unusedWalLog);
}
if (log.isDebugEnabled()) {
log.debug("Recording that data has been ingested into {} using {}", tablet.getExtent(),
logFileOnly);
}
for (String logFile : logFileOnly) {
@SuppressWarnings("deprecation")
Status status =
org.apache.accumulo.server.replication.StatusUtil.openWithUnknownLength();
ReplicationTableUtil.updateFiles(tablet.getContext(), tablet.getExtent(), logFile,
status);
}
}
tablet.finishClearingUnusedLogs();
} finally {
tablet.getLogLock().unlock();
}
do {
try {
// the purpose of making this update use the new commit session, instead of the old one
// passed in, is because the new one will reference the logs used by current memory...
tablet.getTabletServer().minorCompactionFinished(
tablet.getTabletMemory().getCommitSession(), commitSession.getWALogSeq() + 2);
break;
} catch (IOException e) {
log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry", e);
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
} while (true);
synchronized (tablet) {
t1 = System.currentTimeMillis();
if (newFile.isPresent()) {
StoredTabletFile newFileStored = newFile.orElseThrow();
if (datafileSizes.containsKey(newFileStored)) {
log.error("Adding file that is already in set {}", newFileStored);
}
datafileSizes.put(newFileStored, dfv);
}
tablet.flushComplete(flushId);
t2 = System.currentTimeMillis();
}
} catch (Exception e) {
// Any exception in this code is prone to leaving the persisted tablet metadata and the
// tablets in memory data structs out of sync. Log the extent and exact file involved as this
// may be useful for debugging.
log.error("Failure adding minor compacted file {} {}", tablet.getExtent(), newDatafile, e);
throw e;
} finally {
// increment finish count after metadata update AND updating in memory map of files
metadataUpdateCount.updateAndGet(MetadataUpdateCount::incrementFinish);
}
TabletLogger.flushed(tablet.getExtent(), newFile);
if (log.isTraceEnabled()) {
log.trace(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0,
tablet.getExtent().toString()));
}
long splitSize = tablet.getTableConfiguration().getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
if (dfv.getSize() > splitSize) {
log.debug(String.format("Minor Compaction wrote out file larger than split threshold."
+ " split threshold = %,d file size = %,d", splitSize, dfv.getSize()));
}
return newFile;
}
Optional<StoredTabletFile> bringMajorCompactionOnline(Set<StoredTabletFile> oldDatafiles,
TabletFile tmpDatafile, Long compactionId, Set<StoredTabletFile> selectedFiles,
DataFileValue dfv, Optional<ExternalCompactionId> ecid) throws IOException {
final KeyExtent extent = tablet.getExtent();
VolumeManager vm = tablet.getTabletServer().getContext().getVolumeManager();
long t1, t2;
TabletFile newDatafile = CompactableUtils.computeCompactionFileDest(tmpDatafile);
if (vm.exists(newDatafile.getPath())) {
log.error("Target map file already exist " + newDatafile, new Exception());
throw new IllegalStateException("Target map file already exist " + newDatafile);
}
if (dfv.getNumEntries() == 0) {
vm.deleteRecursively(tmpDatafile.getPath());
} else {
// rename before putting in metadata table, so files in metadata table should
// always exist
rename(vm, tmpDatafile.getPath(), newDatafile.getPath());
}
Location lastLocation = null;
Optional<StoredTabletFile> newFile;
if (dfv.getNumEntries() > 0) {
// calling insert to get the new file before inserting into the metadata
newFile = Optional.of(newDatafile.insert());
} else {
newFile = Optional.empty();
}
Long compactionIdToWrite = null;
// increment start count before metadata update AND updating in memory map of files
metadataUpdateCount.updateAndGet(MetadataUpdateCount::incrementStart);
// do not place any code here between above stmt and try{}finally
try {
synchronized (tablet) {
t1 = System.currentTimeMillis();
Preconditions.checkState(datafileSizes.keySet().containsAll(oldDatafiles),
"Compacted files %s are not a subset of tablet files %s", oldDatafiles,
datafileSizes.keySet());
if (newFile.isPresent()) {
Preconditions.checkState(!datafileSizes.containsKey(newFile.orElseThrow()),
"New compaction file %s already exist in tablet files %s", newFile,
datafileSizes.keySet());
}
tablet.incrementDataSourceDeletions();
datafileSizes.keySet().removeAll(oldDatafiles);
if (newFile.isPresent()) {
datafileSizes.put(newFile.orElseThrow(), dfv);
// could be used by a follow on compaction in a multipass compaction
}
tablet.computeNumEntries();
lastLocation = tablet.resetLastLocation();
if (compactionId != null && Collections.disjoint(selectedFiles, datafileSizes.keySet())) {
compactionIdToWrite = compactionId;
}
t2 = System.currentTimeMillis();
}
// known consistency issue between minor and major compactions - see ACCUMULO-18
Set<StoredTabletFile> filesInUseByScans = waitForScansToFinish(oldDatafiles);
if (!filesInUseByScans.isEmpty()) {
log.debug("Adding scan refs to metadata {} {}", extent, filesInUseByScans);
}
ManagerMetadataUtil.replaceDatafiles(tablet.getContext(), extent, oldDatafiles,
filesInUseByScans, newFile, compactionIdToWrite, dfv,
tablet.getTabletServer().getTabletSession(), lastLocation,
tablet.getTabletServer().getLock(), ecid);
tablet.setLastCompactionID(compactionIdToWrite);
removeFilesAfterScan(filesInUseByScans);
} catch (Exception e) {
// Any exception in this code is prone to leaving the persisted tablet metadata and the
// tablets in memory data structs out of sync. Log the extent and exact files involved as this
// may be useful for debugging.
log.error("Failure updating files after major compaction {} {} {}", tablet.getExtent(),
newFile, oldDatafiles, e);
throw e;
} finally {
// increment finish count after metadata update AND updating in memory map of files
metadataUpdateCount.updateAndGet(MetadataUpdateCount::incrementFinish);
}
if (log.isTraceEnabled()) {
log.trace(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0));
}
return newFile;
}
public SortedMap<StoredTabletFile,DataFileValue> getDatafileSizes() {
synchronized (tablet) {
TreeMap<StoredTabletFile,DataFileValue> copy = new TreeMap<>(datafileSizes);
return Collections.unmodifiableSortedMap(copy);
}
}
public Set<TabletFile> getFiles() {
synchronized (tablet) {
HashSet<TabletFile> files = new HashSet<>(datafileSizes.keySet());
return Collections.unmodifiableSet(files);
}
}
public int getNumFiles() {
return datafileSizes.size();
}
public MetadataUpdateCount getUpdateCount() {
return metadataUpdateCount.get();
}
}