blob: eebc0c1ef953c1b7a714f7fe53ba4e19235455e9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.sqoop.io.*;
/**
* Stress test LobFiles by writing a bunch of different files and reading
* them back in various ways.
* Run with: src/scripts/run-perftest.sh LobFileStressTest
*/
public class LobFileStressTest {
// Big records in testBigFile() are 5 GB each.
public static final long LARGE_RECORD_LEN = 5L * 1024L * 1024L * 1024L;
private int numRandomTrials = 1000000;
private Configuration conf;
private boolean allPassed;
private long lastCompressPos; // start offset of the last record in the file.
private long lastRawPos;
public LobFileStressTest() {
conf = new Configuration();
conf.set("fs.default.name", "file:///");
allPassed = true;
}
private Path getPath(boolean compress) {
if (compress) {
return new Path("compressed.lob");
} else {
return new Path("integers.lob");
}
}
private long getLastRecordPos(boolean compress) {
if (compress) {
return lastCompressPos;
} else {
return lastRawPos;
}
}
private void setLastRecordPos(long pos, boolean compress) {
if (compress) {
lastCompressPos = pos;
} else {
lastRawPos = pos;
}
}
private int getNumRecords(boolean compress) {
if (!compress) {
return 40000000; // 40 million.
} else {
return 5000000; // 5 million; the compressor is just too slow for 40M.
}
}
private void writeIntegerFile(boolean compress) throws Exception {
boolean passed = false;
try {
System.out.print("Writing integers file. compress=" + compress + ". ");
Path p = getPath(compress);
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(p)) {
fs.delete(p, false);
}
String codecName = compress ? "deflate" : null;
LobFile.Writer w = LobFile.create(p, conf, false, codecName);
int numRecords = getNumRecords(compress);
for (int i = 0; i < numRecords; i++) {
setLastRecordPos(w.tell(), compress);
OutputStream os = w.writeBlobRecord(0);
DataOutputStream dos = new DataOutputStream(os);
dos.writeInt(i);
dos.close();
os.close();
}
w.close();
System.out.println("PASS");
passed = true;
} finally {
if (!passed) {
allPassed = false;
System.out.println("FAIL");
}
}
}
private void testSequentialScan(boolean compress) throws Exception {
// Write a LobFile containing several million integers
// and read the entire resulting file, verify that they
// all appear in order.
boolean success = false;
System.out.print("Testing sequential scan. compress=" + compress + ". ");
try {
LobFile.Reader r = LobFile.open(getPath(compress), conf);
int numRecords = getNumRecords(compress);
for (int i = 0; i < numRecords; i++) {
r.next();
if (!r.isRecordAvailable()) {
throw new Exception("File ended early; i=" + i);
}
long id = r.getRecordId();
if (id != i) {
throw new Exception("record id mismatch; expected " + i
+ " got " + id);
}
InputStream is = r.readBlobRecord();
DataInputStream dis = new DataInputStream(is);
int val = dis.readInt();
if (val != i) {
throw new Exception("Unexpected value in stream; expected " + i
+ " got " + val);
}
try {
// Make sure we can't read additional data from the stream.
byte b = dis.readByte();
throw new Exception("Got unexpected extra byte : " + b);
} catch (EOFException eof) {
// Expected this. Ignore.
}
dis.close();
}
if (r.next()) {
// We should have finished reading everything already.
throw new Exception("Additional record was waiting at end of file");
}
r.close();
System.out.println("PASS");
success = true;
} finally {
if (!success) {
allPassed = false;
System.out.println("FAIL");
}
}
}
private void testSmallSeeks(boolean compress) throws Exception {
// Write a LobFile containing several million integers
// and seek to randomly selected records. Make sure we can
// access each one.
boolean success = false;
long seed = System.currentTimeMillis();
long lastRecordPos = getLastRecordPos(compress);
System.out.print("Testing random seeks. compress=" + compress + ". "
+ ". seed=" + seed + ". lastPos=" + lastRecordPos +". ");
try {
LobFile.Reader r = LobFile.open(getPath(compress), conf);
Random rnd = new Random(seed);
long curRecord = -1; // current position starts before record 0.
int numRecords = getNumRecords(compress);
for (int i = 0; i < numRandomTrials; i++) {
if (rnd.nextInt(100) < 20 && curRecord != numRecords - 1) {
// Sequentially access the next record.
if (!r.next()) {
throw new Exception("cur record is " + curRecord
+ " but sequential next() failed!");
}
curRecord++;
} else {
// Randomly find a record.
long targetPos = rnd.nextLong() % (lastRecordPos + 1);
r.seek(targetPos);
if (!r.next()) {
throw new Exception("Could not seek to record starting at "
+ targetPos);
}
curRecord = r.getRecordId();
if (curRecord < 0 || curRecord >= numRecords) {
throw new Exception("Unexpected record id " + curRecord);
}
}
// In either case, read the record and verify the data's correct.
InputStream is = r.readBlobRecord();
DataInputStream dis = new DataInputStream(is);
int val = dis.readInt();
if (val != curRecord) {
throw new Exception("Unexpected value in stream; expected "
+ curRecord + " got " + val);
}
try {
// Make sure we can't read additional data from the stream.
byte b = dis.readByte();
throw new Exception("Got unexpected extra byte : " + b);
} catch (EOFException eof) {
// Expected this. Ignore.
}
dis.close();
}
r.close();
System.out.println("PASS");
success = true;
} finally {
if (!success) {
allPassed = false;
System.out.println("FAIL");
}
}
}
private Path getBigFilePath(boolean compress) {
if (compress) {
return new Path("big-compressed.lob");
} else {
return new Path("big.lob");
}
}
/**
* Check that the bytes in buf are of the form 0 1 2 .. 254 255 0 1 2..
* but offset by the value in 'firstByte'. Returns the next byte value
* we would expect in an adjacent follow-up array of bytes.
*/
private byte checkBuf(byte [] buf, int len, byte firstByte) throws Exception {
byte b = firstByte;
for (int i = 0; i < len; i++) {
if (buf[i] != b++) {
throw new Exception("Expected value " + b + " but got " + buf[i]);
}
}
return b;
}
/**
* Check that a big record (from testBigFile) matches its expected
* contract.
* The reader should be positioned at the beginning of the record
* with id 'expectedId'.
*/
private void checkBigRecord(LobFile.Reader r, long expectedId)
throws Exception {
if (!r.isRecordAvailable()) {
throw new Exception("No record available");
}
if (r.getRecordId() != expectedId) {
throw new Exception("Expected record id=" + expectedId
+ "; got " + r.getRecordId());
}
// Read all the data in this record, check that it is the data
// we expect. The bytes in each record increase 0 1 2 .. 254 255 0 1 2..
// but each record's start byte is offset by its record id.
long received = 0;
InputStream is = r.readBlobRecord();
long expected = LARGE_RECORD_LEN;
final int BUF_SIZE = 16384;
byte [] buf = new byte[BUF_SIZE];
byte last = (byte) r.getRecordId();
while (expected > 0) {
int thisRead = is.read(buf);
if (thisRead == -1) {
break;
}
last = checkBuf(buf, thisRead, last);
expected -= thisRead;
}
if (expected > 0) {
throw new Exception("Couldn't read all the data! expected "
+ expected + " more bytes");
}
if (is.read() != -1) {
throw new Exception("Got an extra byte! Expected no more data.");
}
}
private void testBigFile(boolean compress) throws Exception {
// Write a file containing 5 GB records.
final int NUM_RECORDS = 5;
boolean passed = false;
try {
System.out.print("Testing large file operations. compress="
+ compress + ". ");
Path p = getBigFilePath(compress);
long [] startOffsets = new long[NUM_RECORDS];
// Write the file. Five records, 5 GB a piece.
System.out.print("Testing write. ");
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(p)) {
fs.delete(p, false);
}
String codecName = compress ? "deflate" : null;
System.out.println("record size: " + LARGE_RECORD_LEN);
LobFile.Writer w = LobFile.create(p, conf, false, codecName);
for (int i = 0; i < NUM_RECORDS; i++) {
startOffsets[i] = w.tell();
System.out.println("Starting record " + i + " at " + startOffsets[i]);
OutputStream os = w.writeBlobRecord(0);
for (long v = 0; v < LARGE_RECORD_LEN; v++) {
long byteVal = (((long) i) + v) & 0xFF;
os.write((int) byteVal);
}
os.close();
}
w.close();
System.out.println("PASS");
// Iterate past three records, read the fourth.
System.out.print("Testing iterated skipping. ");
LobFile.Reader r = LobFile.open(p, conf);
for (int i = 0; i < 4; i++) {
r.next();
}
checkBigRecord(r, 3);
System.out.println("PASS");
// Seek directly to record 2, read it through.
System.out.print("Testing large backward seek. ");
r.seek(startOffsets[2]);
r.next();
checkBigRecord(r, 2);
System.out.println("PASS");
passed = true;
} finally {
if (!passed) {
allPassed = false;
System.out.println("FAIL");
}
}
}
public void run() throws Exception {
writeIntegerFile(true);
writeIntegerFile(false);
testSequentialScan(false);
testSmallSeeks(false);
testSequentialScan(true);
testSmallSeeks(true);
testBigFile(false);
testBigFile(true);
if (allPassed) {
System.out.println("Tests passed.");
} else {
System.out.println("All tests did not pass!");
System.exit(1);
}
}
public static void main(String [] args) throws Exception {
LobFileStressTest test = new LobFileStressTest();
test.run();
}
}