blob: c84335a763e879ea2f385eec7bd3e6f82a2f4db5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.fs.impl.prefetch;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
/**
* Provides functionality necessary for caching blocks of data read from FileSystem.
* Each cache block is stored on the local disk as a separate file.
*/
public class SingleFilePerBlockCache implements BlockCache {
private static final Logger LOG = LoggerFactory.getLogger(SingleFilePerBlockCache.class);
/**
* Blocks stored in this cache.
*/
private final Map<Integer, Entry> blocks = new ConcurrentHashMap<>();
/**
* Number of times a block was read from this cache.
* Used for determining cache utilization factor.
*/
private int numGets = 0;
private boolean closed;
private final PrefetchingStatistics prefetchingStatistics;
/**
* Cache entry.
* Each block is stored as a separate file.
*/
private static final class Entry {
private final int blockNumber;
private final Path path;
private final int size;
private final long checksum;
Entry(int blockNumber, Path path, int size, long checksum) {
this.blockNumber = blockNumber;
this.path = path;
this.size = size;
this.checksum = checksum;
}
@Override
public String toString() {
return String.format(
"([%03d] %s: size = %d, checksum = %d)",
blockNumber, path, size, checksum);
}
}
/**
* Constructs an instance of a {@code SingleFilePerBlockCache}.
*
* @param prefetchingStatistics statistics for this stream.
*/
public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
}
/**
* Indicates whether the given block is in this cache.
*/
@Override
public boolean containsBlock(int blockNumber) {
return blocks.containsKey(blockNumber);
}
/**
* Gets the blocks in this cache.
*/
@Override
public Iterable<Integer> blocks() {
return Collections.unmodifiableList(new ArrayList<>(blocks.keySet()));
}
/**
* Gets the number of blocks in this cache.
*/
@Override
public int size() {
return blocks.size();
}
/**
* Gets the block having the given {@code blockNumber}.
*
* @throws IllegalArgumentException if buffer is null.
*/
@Override
public void get(int blockNumber, ByteBuffer buffer) throws IOException {
if (closed) {
return;
}
checkNotNull(buffer, "buffer");
Entry entry = getEntry(blockNumber);
buffer.clear();
readFile(entry.path, buffer);
buffer.rewind();
validateEntry(entry, buffer);
}
protected int readFile(Path path, ByteBuffer buffer) throws IOException {
int numBytesRead = 0;
int numBytes;
FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
while ((numBytes = channel.read(buffer)) > 0) {
numBytesRead += numBytes;
}
buffer.limit(buffer.position());
channel.close();
return numBytesRead;
}
private Entry getEntry(int blockNumber) {
Validate.checkNotNegative(blockNumber, "blockNumber");
Entry entry = blocks.get(blockNumber);
if (entry == null) {
throw new IllegalStateException(String.format("block %d not found in cache", blockNumber));
}
numGets++;
return entry;
}
/**
* Puts the given block in this cache.
*
* @throws IllegalArgumentException if buffer is null.
* @throws IllegalArgumentException if buffer.limit() is zero or negative.
*/
@Override
public void put(int blockNumber, ByteBuffer buffer) throws IOException {
if (closed) {
return;
}
checkNotNull(buffer, "buffer");
if (blocks.containsKey(blockNumber)) {
Entry entry = blocks.get(blockNumber);
validateEntry(entry, buffer);
return;
}
Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()");
Path blockFilePath = getCacheFilePath();
long size = Files.size(blockFilePath);
if (size != 0) {
String message =
String.format("[%d] temp file already has data. %s (%d)",
blockNumber, blockFilePath, size);
throw new IllegalStateException(message);
}
writeFile(blockFilePath, buffer);
prefetchingStatistics.blockAddedToFileCache();
long checksum = BufferData.getChecksum(buffer);
Entry entry = new Entry(blockNumber, blockFilePath, buffer.limit(), checksum);
blocks.put(blockNumber, entry);
}
private static final Set<? extends OpenOption> CREATE_OPTIONS =
EnumSet.of(StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
buffer.rewind();
WritableByteChannel writeChannel = Files.newByteChannel(path, CREATE_OPTIONS);
while (buffer.hasRemaining()) {
writeChannel.write(buffer);
}
writeChannel.close();
}
protected Path getCacheFilePath() throws IOException {
return getTempFilePath();
}
@Override
public void close() throws IOException {
if (closed) {
return;
}
closed = true;
LOG.info(getStats());
int numFilesDeleted = 0;
for (Entry entry : blocks.values()) {
try {
Files.deleteIfExists(entry.path);
prefetchingStatistics.blockRemovedFromFileCache();
numFilesDeleted++;
} catch (IOException e) {
// Ignore while closing so that we can delete as many cache files as possible.
}
}
if (numFilesDeleted > 0) {
LOG.info("Deleted {} cache files", numFilesDeleted);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("stats: ");
sb.append(getStats());
sb.append(", blocks:[");
sb.append(getIntList(blocks()));
sb.append("]");
return sb.toString();
}
private void validateEntry(Entry entry, ByteBuffer buffer) {
if (entry.size != buffer.limit()) {
String message = String.format(
"[%d] entry.size(%d) != buffer.limit(%d)",
entry.blockNumber, entry.size, buffer.limit());
throw new IllegalStateException(message);
}
long checksum = BufferData.getChecksum(buffer);
if (entry.checksum != checksum) {
String message = String.format(
"[%d] entry.checksum(%d) != buffer checksum(%d)",
entry.blockNumber, entry.checksum, checksum);
throw new IllegalStateException(message);
}
}
/**
* Produces a human readable list of blocks for the purpose of logging.
* This method minimizes the length of returned list by converting
* a contiguous list of blocks into a range.
* for example,
* 1, 3, 4, 5, 6, 8 becomes 1, 3~6, 8
*/
private String getIntList(Iterable<Integer> nums) {
List<String> numList = new ArrayList<>();
List<Integer> numbers = new ArrayList<Integer>();
for (Integer n : nums) {
numbers.add(n);
}
Collections.sort(numbers);
int index = 0;
while (index < numbers.size()) {
int start = numbers.get(index);
int prev = start;
int end = start;
while ((++index < numbers.size()) && ((end = numbers.get(index)) == prev + 1)) {
prev = end;
}
if (start == prev) {
numList.add(Integer.toString(start));
} else {
numList.add(String.format("%d~%d", start, prev));
}
}
return String.join(", ", numList);
}
private String getStats() {
StringBuilder sb = new StringBuilder();
sb.append(String.format(
"#entries = %d, #gets = %d",
blocks.size(), numGets));
return sb.toString();
}
private static final String CACHE_FILE_PREFIX = "fs-cache-";
public static boolean isCacheSpaceAvailable(long fileSize) {
try {
Path cacheFilePath = getTempFilePath();
long freeSpace = new File(cacheFilePath.toString()).getUsableSpace();
LOG.info("fileSize = {}, freeSpace = {}", fileSize, freeSpace);
Files.deleteIfExists(cacheFilePath);
return fileSize < freeSpace;
} catch (IOException e) {
LOG.error("isCacheSpaceAvailable", e);
return false;
}
}
// The suffix (file extension) of each serialized index file.
private static final String BINARY_FILE_SUFFIX = ".bin";
// File attributes attached to any intermediate temporary file created during index creation.
private static final FileAttribute<Set<PosixFilePermission>> TEMP_FILE_ATTRS =
PosixFilePermissions.asFileAttribute(EnumSet.of(PosixFilePermission.OWNER_READ,
PosixFilePermission.OWNER_WRITE));
private static Path getTempFilePath() throws IOException {
return Files.createTempFile(
CACHE_FILE_PREFIX,
BINARY_FILE_SUFFIX,
TEMP_FILE_ATTRS
);
}
}