blob: 159a3eb53f41287e0c23a0123f8ea9085bb9b8fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.tserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator.DataSource;
import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReportingIterator;
import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
public class FileManager {
private static final Logger log = LoggerFactory.getLogger(FileManager.class);
private int maxOpen;
private static class OpenReader implements Comparable<OpenReader> {
long releaseTime;
FileSKVIterator reader;
String fileName;
public OpenReader(String fileName, FileSKVIterator reader) {
this.fileName = fileName;
this.reader = reader;
this.releaseTime = System.currentTimeMillis();
}
@Override
public int compareTo(OpenReader o) {
return Long.compare(releaseTime, o.releaseTime);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof OpenReader) {
return compareTo((OpenReader) obj) == 0;
}
return false;
}
@Override
public int hashCode() {
return fileName.hashCode();
}
}
private Map<String,List<OpenReader>> openFiles;
private HashMap<FileSKVIterator,String> reservedReaders;
private Semaphore filePermits;
private VolumeManager fs;
private Cache<String,Long> fileLenCache;
private long maxIdleTime;
private long slowFilePermitMillis;
private final ServerContext context;
private class IdleFileCloser implements Runnable {
@Override
public void run() {
long curTime = System.currentTimeMillis();
ArrayList<FileSKVIterator> filesToClose = new ArrayList<>();
// determine which files to close in a sync block, and then close the
// files outside of the sync block
synchronized (FileManager.this) {
Iterator<Entry<String,List<OpenReader>>> iter = openFiles.entrySet().iterator();
while (iter.hasNext()) {
Entry<String,List<OpenReader>> entry = iter.next();
List<OpenReader> ofl = entry.getValue();
for (Iterator<OpenReader> oflIter = ofl.iterator(); oflIter.hasNext();) {
OpenReader openReader = oflIter.next();
if (curTime - openReader.releaseTime > maxIdleTime) {
filesToClose.add(openReader.reader);
oflIter.remove();
}
}
if (ofl.isEmpty()) {
iter.remove();
}
}
}
closeReaders(filesToClose);
}
}
public FileManager(ServerContext context, VolumeManager fs, int maxOpen,
Cache<String,Long> fileLenCache) {
if (maxOpen <= 0)
throw new IllegalArgumentException("maxOpen <= 0");
this.context = context;
this.fileLenCache = fileLenCache;
this.filePermits = new Semaphore(maxOpen, false);
this.maxOpen = maxOpen;
this.fs = fs;
this.openFiles = new HashMap<>();
this.reservedReaders = new HashMap<>();
this.maxIdleTime = context.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
SimpleTimer.getInstance(context.getConfiguration()).schedule(new IdleFileCloser(), maxIdleTime,
maxIdleTime / 2);
this.slowFilePermitMillis =
context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FILEPERMIT_MILLIS);
}
private static int countReaders(Map<String,List<OpenReader>> files) {
int count = 0;
for (List<OpenReader> list : files.values()) {
count += list.size();
}
return count;
}
private List<FileSKVIterator> takeLRUOpenFiles(int numToTake) {
ArrayList<OpenReader> openReaders = new ArrayList<>();
for (Entry<String,List<OpenReader>> entry : openFiles.entrySet()) {
openReaders.addAll(entry.getValue());
}
Collections.sort(openReaders);
ArrayList<FileSKVIterator> ret = new ArrayList<>();
for (int i = 0; i < numToTake && i < openReaders.size(); i++) {
OpenReader or = openReaders.get(i);
List<OpenReader> ofl = openFiles.get(or.fileName);
if (!ofl.remove(or)) {
throw new RuntimeException("Failed to remove open reader that should have been there");
}
if (ofl.isEmpty()) {
openFiles.remove(or.fileName);
}
ret.add(or.reader);
}
return ret;
}
private void closeReaders(Collection<FileSKVIterator> filesToClose) {
for (FileSKVIterator reader : filesToClose) {
try {
reader.close();
} catch (Exception e) {
log.error("Failed to close file {}", e.getMessage(), e);
}
}
}
private List<String> takeOpenFiles(Collection<String> files,
Map<FileSKVIterator,String> readersReserved) {
List<String> filesToOpen = Collections.emptyList();
for (String file : files) {
List<OpenReader> ofl = openFiles.get(file);
if (ofl != null && !ofl.isEmpty()) {
OpenReader openReader = ofl.remove(ofl.size() - 1);
readersReserved.put(openReader.reader, file);
if (ofl.isEmpty()) {
openFiles.remove(file);
}
} else {
if (filesToOpen.isEmpty()) {
filesToOpen = new ArrayList<>(files.size());
}
filesToOpen.add(file);
}
}
return filesToOpen;
}
private Map<FileSKVIterator,String> reserveReaders(KeyExtent tablet, Collection<String> files,
boolean continueOnFailure, CacheProvider cacheProvider) throws IOException {
if (!tablet.isMeta() && files.size() >= maxOpen) {
throw new IllegalArgumentException("requested files exceeds max open");
}
if (files.isEmpty()) {
return Collections.emptyMap();
}
List<String> filesToOpen = null;
List<FileSKVIterator> filesToClose = Collections.emptyList();
Map<FileSKVIterator,String> readersReserved = new HashMap<>();
if (!tablet.isMeta()) {
long start = System.currentTimeMillis();
filePermits.acquireUninterruptibly(files.size());
long waitTime = System.currentTimeMillis() - start;
if (waitTime >= slowFilePermitMillis) {
log.info("Slow file permits request: {} ms, files requested: {}, tablet: {}", waitTime,
files.size(), tablet);
}
}
// now that the we are past the semaphore, we have the authority
// to open files.size() files
// determine what work needs to be done in sync block
// but do the work of opening and closing files outside
// a synch block
synchronized (this) {
filesToOpen = takeOpenFiles(files, readersReserved);
if (!filesToOpen.isEmpty()) {
int numOpen = countReaders(openFiles);
if (filesToOpen.size() + numOpen + reservedReaders.size() > maxOpen) {
filesToClose =
takeLRUOpenFiles((filesToOpen.size() + numOpen + reservedReaders.size()) - maxOpen);
}
}
}
readersReserved.forEach((k, v) -> k.setCacheProvider(cacheProvider));
// close files before opening files to ensure we stay under resource
// limitations
closeReaders(filesToClose);
// open any files that need to be opened
for (String file : filesToOpen) {
try {
if (!file.contains(":"))
throw new IllegalArgumentException("Expected uri, got : " + file);
Path path = new Path(file);
FileSystem ns = fs.getFileSystemByPath(path);
// log.debug("Opening "+file + " path " + path);
FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(path.toString(), ns, ns.getConf(), context.getCryptoService())
.withTableConfiguration(context.getTableConfiguration(tablet.tableId()))
.withCacheProvider(cacheProvider).withFileLenCache(fileLenCache).build();
readersReserved.put(reader, file);
} catch (Exception e) {
ProblemReports.getInstance(context)
.report(new ProblemReport(tablet.tableId(), ProblemType.FILE_READ, file, e));
if (continueOnFailure) {
// release the permit for the file that failed to open
if (!tablet.isMeta()) {
filePermits.release(1);
}
log.warn("Failed to open file {} {} continuing...", file, e.getMessage(), e);
} else {
// close whatever files were opened
closeReaders(readersReserved.keySet());
if (!tablet.isMeta()) {
filePermits.release(files.size());
}
log.error("Failed to open file {} {}", file, e.getMessage());
throw new IOException("Failed to open " + file, e);
}
}
}
synchronized (this) {
// update set of reserved readers
reservedReaders.putAll(readersReserved);
}
return readersReserved;
}
private void releaseReaders(KeyExtent tablet, List<FileSKVIterator> readers,
boolean sawIOException) {
// put files in openFiles
synchronized (this) {
// check that readers were actually reserved ... want to make sure a thread does
// not try to release readers they never reserved
if (!reservedReaders.keySet().containsAll(readers)) {
throw new IllegalArgumentException("Asked to release readers that were never reserved ");
}
for (FileSKVIterator reader : readers) {
try {
reader.closeDeepCopies();
} catch (IOException e) {
log.warn("{}", e.getMessage(), e);
sawIOException = true;
}
}
for (FileSKVIterator reader : readers) {
String fileName = reservedReaders.remove(reader);
if (!sawIOException)
openFiles.computeIfAbsent(fileName, k -> new ArrayList<>())
.add(new OpenReader(fileName, reader));
}
}
if (sawIOException)
closeReaders(readers);
// decrement the semaphore
if (!tablet.isMeta()) {
filePermits.release(readers.size());
}
}
static class FileDataSource implements DataSource {
private SortedKeyValueIterator<Key,Value> iter;
private ArrayList<FileDataSource> deepCopies;
private boolean current = true;
private IteratorEnvironment env;
private String file;
private AtomicBoolean iflag;
FileDataSource(String file, SortedKeyValueIterator<Key,Value> iter) {
this.file = file;
this.iter = iter;
this.deepCopies = new ArrayList<>();
}
public FileDataSource(IteratorEnvironment env, SortedKeyValueIterator<Key,Value> deepCopy,
ArrayList<FileDataSource> deepCopies) {
this.iter = deepCopy;
this.env = env;
this.deepCopies = deepCopies;
deepCopies.add(this);
}
@Override
public boolean isCurrent() {
return current;
}
@Override
public DataSource getNewDataSource() {
current = true;
return this;
}
@Override
public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
return new FileDataSource(env, iter.deepCopy(env), deepCopies);
}
@Override
public SortedKeyValueIterator<Key,Value> iterator() {
return iter;
}
void unsetIterator() {
current = false;
iter = null;
for (FileDataSource fds : deepCopies) {
fds.current = false;
fds.iter = null;
}
}
void setIterator(SortedKeyValueIterator<Key,Value> iter) {
current = false;
this.iter = iter;
if (iflag != null) {
((InterruptibleIterator) this.iter).setInterruptFlag(iflag);
}
for (FileDataSource fds : deepCopies) {
fds.current = false;
fds.iter = iter.deepCopy(fds.env);
}
}
@Override
public void setInterruptFlag(AtomicBoolean flag) {
this.iflag = flag;
((InterruptibleIterator) this.iter).setInterruptFlag(iflag);
}
}
public class ScanFileManager {
private ArrayList<FileDataSource> dataSources;
private ArrayList<FileSKVIterator> tabletReservedReaders;
private KeyExtent tablet;
private boolean continueOnFailure;
private CacheProvider cacheProvider;
ScanFileManager(KeyExtent tablet, CacheProvider cacheProvider) {
tabletReservedReaders = new ArrayList<>();
dataSources = new ArrayList<>();
this.tablet = tablet;
this.cacheProvider = cacheProvider;
continueOnFailure = context.getTableConfiguration(tablet.tableId())
.getBoolean(Property.TABLE_FAILURES_IGNORE);
if (tablet.isMeta()) {
continueOnFailure = false;
}
}
private Map<FileSKVIterator,String> openFiles(List<String> files)
throws TooManyFilesException, IOException {
// one tablet can not open more than maxOpen files, otherwise it could get stuck
// forever waiting on itself to release files
if (tabletReservedReaders.size() + files.size() >= maxOpen) {
throw new TooManyFilesException(
"Request to open files would exceed max open files reservedReaders.size()="
+ tabletReservedReaders.size() + " files.size()=" + files.size() + " maxOpen="
+ maxOpen + " tablet = " + tablet);
}
Map<FileSKVIterator,String> newlyReservedReaders =
reserveReaders(tablet, files, continueOnFailure, cacheProvider);
tabletReservedReaders.addAll(newlyReservedReaders.keySet());
return newlyReservedReaders;
}
public synchronized List<InterruptibleIterator> openFiles(Map<TabletFile,DataFileValue> files,
boolean detachable, SamplerConfigurationImpl samplerConfig) throws IOException {
Map<FileSKVIterator,String> newlyReservedReaders = openFiles(
files.keySet().stream().map(TabletFile::getPathStr).collect(Collectors.toList()));
ArrayList<InterruptibleIterator> iters = new ArrayList<>();
boolean sawTimeSet = false;
for (DataFileValue dfv : files.values()) {
if (dfv.isTimeSet()) {
sawTimeSet = true;
break;
}
}
for (Entry<FileSKVIterator,String> entry : newlyReservedReaders.entrySet()) {
FileSKVIterator reader = entry.getKey();
String filename = entry.getValue();
InterruptibleIterator iter;
FileSKVIterator source = reader;
if (samplerConfig != null) {
source = source.getSample(samplerConfig);
if (source == null) {
throw new SampleNotPresentException();
}
}
if (detachable) {
FileDataSource fds = new FileDataSource(filename, source);
dataSources.add(fds);
SourceSwitchingIterator ssi = new SourceSwitchingIterator(fds);
iter = new ProblemReportingIterator(context, tablet.tableId(), filename,
continueOnFailure, ssi);
} else {
iter = new ProblemReportingIterator(context, tablet.tableId(), filename,
continueOnFailure, source);
}
if (sawTimeSet) {
// constructing FileRef is expensive so avoid if not needed
DataFileValue value = files.get(new TabletFile(new Path(filename)));
if (value.isTimeSet()) {
iter = new TimeSettingIterator(iter, value.getTime());
}
}
iters.add(iter);
}
return iters;
}
public synchronized void detach() {
releaseReaders(tablet, tabletReservedReaders, false);
tabletReservedReaders.clear();
for (FileDataSource fds : dataSources)
fds.unsetIterator();
}
public synchronized void reattach(SamplerConfigurationImpl samplerConfig) throws IOException {
if (!tabletReservedReaders.isEmpty())
throw new IllegalStateException();
List<String> files = dataSources.stream().map(x -> x.file).collect(Collectors.toList());
Map<FileSKVIterator,String> newlyReservedReaders = openFiles(files);
Map<String,List<FileSKVIterator>> map = new HashMap<>();
newlyReservedReaders.forEach(
(reader, fileName) -> map.computeIfAbsent(fileName, k -> new LinkedList<>()).add(reader));
for (FileDataSource fds : dataSources) {
FileSKVIterator source = map.get(fds.file).remove(0);
if (samplerConfig != null) {
source = source.getSample(samplerConfig);
if (source == null) {
throw new SampleNotPresentException();
}
}
fds.setIterator(source);
}
}
public synchronized void releaseOpenFiles(boolean sawIOException) {
releaseReaders(tablet, tabletReservedReaders, sawIOException);
tabletReservedReaders.clear();
dataSources.clear();
}
public synchronized int getNumOpenFiles() {
return tabletReservedReaders.size();
}
}
public ScanFileManager newScanFileManager(KeyExtent tablet, CacheProvider cacheProvider) {
return new ScanFileManager(tablet, cacheProvider);
}
}