blob: 895405a35b0b1eaa1da6749a5ca67cf7c71bfc55 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.hdfs.tools;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.conf.Configuration;
public class HdfsIOBenchmark {
private String mode;
private int size;
private int loop;
private Path path;
public HdfsIOBenchmark(String[] args){
mode = args[0];
size = Integer.parseInt(args[1]);
loop = Integer.parseInt(args[2]);
path = new Path(args[3]);
}
public void run() throws Exception {
if (mode.equals("writeSequentialHeap")){
writeSequentialHeap();
} else if (mode.equalsIgnoreCase("readSequentialDirect")){
readSequentialDirect();
} else if (mode.equals("readSequentialHeap")){
readSequentialHeap();
} else if (mode.equals("readRandomDirect")){
readRandomDirect();
} else if (mode.equals("readRandomHeap")){
readRandomHeap();
} else if (mode.equals("getFile")){
getFile();
} else if (mode.equals("createFile")){
createFile();
} else if (mode.equals("enumerateDir")){
enumerateDir();
} else if (mode.equals("keyGet")){
keyGet();
} else if (mode.equals("browseDir")){
browseDir();
} else {
usage();
System.exit(0);
}
}
public static void usage(){
System.out.println("Usage:");
System.out.println("hdfsbench <readSequentialDirect|readSequentialHeap|readRandomDirect|readRandomHeap|writeSequentialHeap> <size> <iterations> <path>");
}
public void writeSequentialHeap() throws Exception {
System.out.println("writing sequential file in heap mode " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream instream = fs.create(path);
byte[] buf = new byte[size];
double sumbytes = 0;
double ops = 0;
System.out.println("read size " + size);
System.out.println("operations " + loop);
long start = System.currentTimeMillis();
while (ops < loop) {
// System.out.println("writing data, len " + buf.length);
instream.write(buf, 0, buf.length);
sumbytes = sumbytes + buf.length;
ops = ops + 1.0;
}
instream.flush();
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double throughput = 0.0;
double latency = 0.0;
double sumbits = sumbytes * 8.0;
if (executionTime > 0) {
throughput = sumbits / executionTime / 1024.0 / 1024.0;
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("sumbytes " + sumbytes);
System.out.println("throughput " + throughput);
System.out.println("latency " + latency);
System.out.println("closing stream");
instream.close();
fs.close();
}
public void readSequentialDirect() throws Exception {
System.out.println("reading sequential file in direct mode " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus status = fs.getFileStatus(path);
FSDataInputStream instream = fs.open(path);
ByteBuffer buf = ByteBuffer.allocateDirect(size);
buf.clear();
double sumbytes = 0;
double ops = 0;
System.out.println("file capacity " + status.getLen());
System.out.println("read size " + size);
System.out.println("operations " + loop);
long start = System.currentTimeMillis();
while (ops < loop) {
buf.clear();
double ret = (double) instream.read(buf);
if (ret > 0) {
sumbytes = sumbytes + ret;
ops = ops + 1.0;
} else {
ops = ops + 1.0;
if (instream.getPos() == 0){
break;
} else {
instream.seek(0);
}
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double throughput = 0.0;
double latency = 0.0;
double sumbits = sumbytes * 8.0;
if (executionTime > 0) {
throughput = sumbits / executionTime / 1024.0 / 1024.0;
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("sumbytes " + sumbytes);
System.out.println("throughput " + throughput);
System.out.println("latency " + latency);
System.out.println("closing stream");
instream.close();
fs.close();
}
public void readSequentialHeap() throws Exception {
System.out.println("reading sequential file in heap mode " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus status = fs.getFileStatus(path);
FSDataInputStream instream = fs.open(path);
byte[] buf = new byte[size];
double sumbytes = 0;
double ops = 0;
System.out.println("file capacity " + status.getLen());
System.out.println("read size " + size);
System.out.println("operations " + loop);
long start = System.currentTimeMillis();
while (ops < loop) {
double ret = (double) this.read(instream, buf);
if (ret > 0) {
sumbytes = sumbytes + ret;
ops = ops + 1.0;
} else {
ops = ops + 1.0;
if (instream.getPos() == 0){
break;
} else {
instream.seek(0);
}
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double throughput = 0.0;
double latency = 0.0;
double sumbits = sumbytes * 8.0;
if (executionTime > 0) {
throughput = sumbits / executionTime / 1024.0 / 1024.0;
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("sumbytes " + sumbytes);
System.out.println("throughput " + throughput);
System.out.println("latency " + latency);
System.out.println("closing stream");
instream.close();
fs.close();
}
public void readRandomDirect() throws Exception{
System.out.println("reading random file in direct mode " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus status = fs.getFileStatus(path);
FSDataInputStream instream = fs.open(path);
ByteBuffer buf = ByteBuffer.allocateDirect(size);
buf.clear();
double sumbytes = 0;
double ops = 0;
long _range = status.getLen()- ((long)buf.capacity());
double range = (double) _range;
Random random = new Random();
System.out.println("file capacity " + status.getLen());
System.out.println("read size " + size);
System.out.println("operations " + loop);
long start = System.currentTimeMillis();
while (ops < loop) {
buf.clear();
double _offset = range*random.nextDouble();
long offset = (long) _offset;
instream.seek(offset);
double ret = (double) instream.read(buf);
if (ret > 0) {
sumbytes = sumbytes + ret;
ops = ops + 1.0;
} else {
break;
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double throughput = 0.0;
double latency = 0.0;
double sumbits = sumbytes * 8.0;
if (executionTime > 0) {
throughput = sumbits / executionTime / 1024.0 / 1024.0;
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("sumbytes " + sumbytes);
System.out.println("throughput " + throughput);
System.out.println("latency " + latency);
System.out.println("closing stream");
instream.close();
fs.close();
}
public void readRandomHeap() throws Exception{
System.out.println("reading random file in heap mode " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus status = fs.getFileStatus(path);
FSDataInputStream instream = fs.open(path);
byte[] buf = new byte[size];
double sumbytes = 0;
double ops = 0;
long _range = status.getLen()- ((long)buf.length);
double range = (double) _range;
Random random = new Random();
System.out.println("file capacity " + status.getLen());
System.out.println("read size " + size);
System.out.println("operations " + loop);
long start = System.currentTimeMillis();
while (ops < loop) {
double _offset = range*random.nextDouble();
long offset = (long) _offset;
instream.seek(offset);
double ret = (double) this.read(instream, buf);
if (ret > 0) {
sumbytes = sumbytes + ret;
ops = ops + 1.0;
} else {
break;
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double throughput = 0.0;
double latency = 0.0;
double sumbits = sumbytes * 8.0;
if (executionTime > 0) {
throughput = sumbits / executionTime / 1024.0 / 1024.0;
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("sumbytes " + sumbytes);
System.out.println("throughput " + throughput);
System.out.println("latency " + latency);
System.out.println("closing stream");
instream.close();
fs.close();
}
void getFile() throws Exception, InterruptedException {
System.out.println("get file, path " + path + ", outstanding " + size + ", loop " + loop);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path paths[] = new Path[loop];
for (int j = 0; j < loop; j++){
paths[j] = new Path(path.toString() + "/" + j);
}
int repfactor = 4;
for (int k = 0; k < repfactor; k++){
long start = System.currentTimeMillis();
for (int i = 0; i < size; i++){
//single operation == loop
for (int j = 0; j < loop; j++){
fs.listStatus(paths[j]);
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start));
double latency = executionTime*1000.0 / ((double) size);
System.out.println("execution time [ms] " + executionTime);
System.out.println("latency [us] " + latency);
}
fs.close();
}
void createFile() throws Exception, InterruptedException {
System.out.println("create file async hdfs, path " + path + ", size " + size + ", loop " + loop);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
int repfactor = 4;
for (int k = 0; k < repfactor; k++){
LinkedBlockingQueue<Path> pathQueue = new LinkedBlockingQueue<Path>();
fs.mkdirs(path);
for (int i = 0; i < loop*size; i++){
String name = "" + i;
Path f = new Path(path, name);
pathQueue.add(f);
}
LinkedBlockingQueue<FSDataOutputStream> streamQueue = new LinkedBlockingQueue<FSDataOutputStream>();
long start = System.currentTimeMillis();
for (int i = 0; i < size; i++){
//single operation == loop
for (int j = 0; j < loop; j++) {
Path path = pathQueue.poll();
fs.create(path).close();
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start));
double latency = executionTime*1000.0 / ((double) size);
System.out.println("execution time [ms] " + executionTime);
System.out.println("latency [us] " + latency);
while(!streamQueue.isEmpty()){
FSDataOutputStream stream = streamQueue.poll();
stream.close();
}
if (k < repfactor - 1){
fs.delete(path, true);
Thread.sleep(2000);
}
}
fs.close();
}
void enumerateDir() throws Exception {
System.out.println("enumarate dir, path " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
int repfactor = 4;
for (int k = 0; k < repfactor; k++) {
long start = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
// single operation == loop
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(path, false);
while (iter.hasNext()) {
iter.next();
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start));
double latency = executionTime * 1000.0 / ((double) size);
System.out.println("execution time [ms] " + executionTime);
System.out.println("latency [us] " + latency);
}
fs.close();
}
void keyGet() throws Exception {
System.out.println("key get, path " + path + ", size " + size + ", loop " + loop);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path[] paths = new Path[loop];
for (int i = 0; i < loop; i++){
String child = "" + i;
paths[i] = new Path(path, child);
System.out.println("path " + paths[i]);
}
byte[] outBuf = new byte[size];
for (Path p : paths){
FSDataOutputStream outputStream = fs.create(p);
outputStream.write(outBuf);
outputStream.close();
}
long start = System.currentTimeMillis();
ByteBuffer inBuf = ByteBuffer.allocateDirect(size);
for (int i = 0; i < loop; i++){
Path p = paths[i];
FSDataInputStream inputStream = fs.open(p);
inBuf.clear();
while(inBuf.remaining() > 0){
inputStream.read(inBuf);
}
inputStream.close();
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start));
double latency = executionTime * 1000.0 / ((double) loop);
System.out.println("execution time [ms] " + executionTime);
System.out.println("latency [us] " + latency);
fs.close();
}
private int read(FSDataInputStream stream, byte[] buf) throws IOException{
int off = 0;
int len = buf.length;
int ret = stream.read(buf, off, len);
while(ret > 0 && len - ret > 0){
len -= ret;
off += ret;
ret = stream.read(buf, off, len);
}
return off > 0 || ret > 0 ? ret : -1;
}
void browseDir() throws Exception {
System.out.println("reading enumarate dir, path " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
//benchmark
System.out.println("starting benchmark...");
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(path, false);
while (iter.hasNext()) {
LocatedFileStatus status = iter.next();
System.out.println(status.getPath());
}
fs.close();
}
/**
* @param args
*/
public static void main(String[] args) {
if (args.length != 4){
usage();
System.exit(0);
}
try {
HdfsIOBenchmark benchmark = new HdfsIOBenchmark(args);
benchmark.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}