blob: 631842f78e20d16ef623f07e6b12ca2ae49c9cb1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.benchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.FileSystems;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.impl.FileRangeImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class VectoredReadBenchmark {
static final Path DATA_PATH = getTestDataPath();
static final String DATA_PATH_PROPERTY = "bench.data";
static final int READ_SIZE = 64 * 1024;
static final long SEEK_SIZE = 1024L * 1024;
static Path getTestDataPath() {
String value = System.getProperty(DATA_PATH_PROPERTY);
return new Path(value == null ? "/tmp/taxi.orc" : value);
}
@State(Scope.Thread)
public static class FileSystemChoice {
@Param({"local", "raw"})
private String fileSystemKind;
private Configuration conf;
private FileSystem fs;
@Setup(Level.Trial)
public void setup() {
conf = new Configuration();
try {
LocalFileSystem local = FileSystem.getLocal(conf);
fs = "raw".equals(fileSystemKind) ? local.getRaw() : local;
} catch (IOException e) {
throw new IllegalArgumentException("Can't get filesystem", e);
}
}
}
@State(Scope.Thread)
public static class BufferChoice {
@Param({"direct", "array"})
private String bufferKind;
private IntFunction<ByteBuffer> allocate;
@Setup(Level.Trial)
public void setup() {
allocate = "array".equals(bufferKind)
? ByteBuffer::allocate : ByteBuffer::allocateDirect;
}
}
@Benchmark
public void asyncRead(FileSystemChoice fsChoice,
BufferChoice bufferChoice,
Blackhole blackhole) throws Exception {
FSDataInputStream stream = fsChoice.fs.open(DATA_PATH);
List<FileRange> ranges = new ArrayList<>();
for(int m=0; m < 100; ++m) {
FileRange range = FileRange.createFileRange(m * SEEK_SIZE, READ_SIZE);
ranges.add(range);
}
stream.readVectored(ranges, bufferChoice.allocate);
for(FileRange range: ranges) {
blackhole.consume(range.getData().get());
}
stream.close();
}
static class Joiner implements CompletionHandler<ByteBuffer, FileRange> {
private int remaining;
private final ByteBuffer[] result;
private Throwable exception = null;
Joiner(int total) {
remaining = total;
result = new ByteBuffer[total];
}
synchronized void finish() {
remaining -= 1;
if (remaining == 0) {
notify();
}
}
synchronized ByteBuffer[] join() throws InterruptedException, IOException {
while (remaining > 0 && exception == null) {
wait();
}
if (exception != null) {
throw new IOException("problem reading", exception);
}
return result;
}
@Override
public synchronized void completed(ByteBuffer buffer, FileRange attachment) {
result[--remaining] = buffer;
if (remaining == 0) {
notify();
}
}
@Override
public synchronized void failed(Throwable exc, FileRange attachment) {
this.exception = exc;
notify();
}
}
static class FileRangeCallback extends FileRangeImpl implements
CompletionHandler<Integer, FileRangeCallback> {
private final AsynchronousFileChannel channel;
private final ByteBuffer buffer;
private int completed = 0;
private final Joiner joiner;
FileRangeCallback(AsynchronousFileChannel channel, long offset,
int length, Joiner joiner, ByteBuffer buffer) {
super(offset, length);
this.channel = channel;
this.joiner = joiner;
this.buffer = buffer;
}
@Override
public void completed(Integer result, FileRangeCallback attachment) {
final int bytes = result;
if (bytes == -1) {
failed(new EOFException("Read past end of file"), this);
}
completed += bytes;
if (completed < this.getLength()) {
channel.read(buffer, this.getOffset() + completed, this, this);
} else {
buffer.flip();
joiner.finish();
}
}
@Override
public void failed(Throwable exc, FileRangeCallback attachment) {
joiner.failed(exc, this);
}
}
@Benchmark
public void asyncFileChanArray(BufferChoice bufferChoice,
Blackhole blackhole) throws Exception {
java.nio.file.Path path = FileSystems.getDefault().getPath(DATA_PATH.toString());
AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
List<FileRangeImpl> ranges = new ArrayList<>();
Joiner joiner = new Joiner(100);
for(int m=0; m < 100; ++m) {
ByteBuffer buffer = bufferChoice.allocate.apply(READ_SIZE);
FileRangeCallback range = new FileRangeCallback(channel, m * SEEK_SIZE,
READ_SIZE, joiner, buffer);
ranges.add(range);
channel.read(buffer, range.getOffset(), range, range);
}
joiner.join();
channel.close();
blackhole.consume(ranges);
}
@Benchmark
public void syncRead(FileSystemChoice fsChoice,
Blackhole blackhole) throws Exception {
FSDataInputStream stream = fsChoice.fs.open(DATA_PATH);
List<byte[]> result = new ArrayList<>();
for(int m=0; m < 100; ++m) {
byte[] buffer = new byte[READ_SIZE];
stream.readFully(m * SEEK_SIZE, buffer);
result.add(buffer);
}
blackhole.consume(result);
stream.close();
}
/**
* Run the benchmarks.
* @param args the pathname of a 100MB data file
* @throws Exception any ex.
*/
public static void main(String[] args) throws Exception {
OptionsBuilder opts = new OptionsBuilder();
opts.include("VectoredReadBenchmark");
opts.jvmArgs("-server", "-Xms256m", "-Xmx2g",
"-D" + DATA_PATH_PROPERTY + "=" + args[0]);
opts.forks(1);
new Runner(opts.build()).run();
}
}