blob: c404d858caf0a1bfd869ceb0ef6136f2248dac12 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.slive;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Random;
import static org.apache.hadoop.fs.slive.Constants.BYTES_PER_LONG;
/**
* Class which handles generating data (creating and appending) along with
* ensuring the correct data is written out for the given path name so that it
* can be later verified
*/
class DataWriter {
/**
* Header size in bytes
*/
private static final int HEADER_LENGTH = (BYTES_PER_LONG * 2);
private int bufferSize;
private Random rnd;
/**
* Class used to hold the number of bytes written and time taken for write
* operations for callers to use
*/
static class GenerateOutput {
private long bytes;
private long time;
GenerateOutput(long bytesWritten, long timeTaken) {
this.bytes = bytesWritten;
this.time = timeTaken;
}
long getBytesWritten() {
return bytes;
}
long getTimeTaken() {
return time;
}
public String toString() {
return "Wrote " + getBytesWritten() + " bytes " + " which took "
+ getTimeTaken() + " milliseconds";
}
}
/**
* Class used to hold a byte buffer and offset position for generating data
*/
private static class GenerateResult {
private long offset;
private ByteBuffer buffer;
GenerateResult(long offset, ByteBuffer buffer) {
this.offset = offset;
this.buffer = buffer;
}
long getOffset() {
return offset;
}
ByteBuffer getBuffer() {
return buffer;
}
}
/**
* What a header write output returns need the hash value to use and the time
* taken to perform the write + bytes written
*/
private static class WriteInfo {
private long hashValue;
private long bytesWritten;
private long timeTaken;
WriteInfo(long hashValue, long bytesWritten, long timeTaken) {
this.hashValue = hashValue;
this.bytesWritten = bytesWritten;
this.timeTaken = timeTaken;
}
long getHashValue() {
return hashValue;
}
long getTimeTaken() {
return timeTaken;
}
long getBytesWritten() {
return bytesWritten;
}
}
/**
* Inits with given buffer size (must be greater than bytes per long and a
* multiple of bytes per long)
*
* @param rnd
* random number generator to use for hash value creation
*
* @param bufferSize
* size which must be greater than BYTES_PER_LONG and which also must
* be a multiple of BYTES_PER_LONG
*/
DataWriter(Random rnd, int bufferSize) {
if (bufferSize < BYTES_PER_LONG) {
throw new IllegalArgumentException(
"Buffer size must be greater than or equal to " + BYTES_PER_LONG);
}
if ((bufferSize % BYTES_PER_LONG) != 0) {
throw new IllegalArgumentException("Buffer size must be a multiple of "
+ BYTES_PER_LONG);
}
this.bufferSize = bufferSize;
this.rnd = rnd;
}
/**
* Inits with default buffer size
*/
DataWriter(Random rnd) {
this(rnd, Constants.BUFFERSIZE);
}
/**
* Generates a partial segment which is less than bytes per long size
*
* @param byteAm
* the number of bytes to generate (less than bytes per long)
* @param offset
* the staring offset
* @param hasher
* hasher to use for generating data given an offset
*
* @return GenerateResult containing new offset and byte buffer
*/
private GenerateResult generatePartialSegment(int byteAm, long offset,
DataHasher hasher) {
if (byteAm > BYTES_PER_LONG) {
throw new IllegalArgumentException(
"Partial bytes must be less or equal to " + BYTES_PER_LONG);
}
if (byteAm <= 0) {
throw new IllegalArgumentException(
"Partial bytes must be greater than zero and not " + byteAm);
}
ByteBuffer buf = ByteBuffer.wrap(new byte[BYTES_PER_LONG]);
buf.putLong(hasher.generate(offset));
ByteBuffer allBytes = ByteBuffer.wrap(new byte[byteAm]);
buf.rewind();
for (int i = 0; i < byteAm; ++i) {
allBytes.put(buf.get());
}
allBytes.rewind();
return new GenerateResult(offset, allBytes);
}
/**
* Generates a full segment (aligned to bytes per long) of the given byte
* amount size
*
* @param byteAm
* long aligned size
* @param startOffset
* starting hash offset
* @param hasher
* hasher to use for generating data given an offset
* @return GenerateResult containing new offset and byte buffer
*/
private GenerateResult generateFullSegment(int byteAm, long startOffset,
DataHasher hasher) {
if (byteAm <= 0) {
throw new IllegalArgumentException(
"Byte amount must be greater than zero and not " + byteAm);
}
if ((byteAm % BYTES_PER_LONG) != 0) {
throw new IllegalArgumentException("Byte amount " + byteAm
+ " must be a multiple of " + BYTES_PER_LONG);
}
// generate all the segments
ByteBuffer allBytes = ByteBuffer.wrap(new byte[byteAm]);
long offset = startOffset;
ByteBuffer buf = ByteBuffer.wrap(new byte[BYTES_PER_LONG]);
for (long i = 0; i < byteAm; i += BYTES_PER_LONG) {
buf.rewind();
buf.putLong(hasher.generate(offset));
buf.rewind();
allBytes.put(buf);
offset += BYTES_PER_LONG;
}
allBytes.rewind();
return new GenerateResult(offset, allBytes);
}
/**
* Writes a set of bytes to the output stream, for full segments it will write
* out the complete segment but for partial segments, ie when the last
* position does not fill up a full long then a partial set will be written
* out containing the needed bytes from the expected full segment
*
* @param byteAm
* the amount of bytes to write
* @param startPos
* a BYTES_PER_LONG aligned start position
* @param hasher
* hasher to use for generating data given an offset
* @param out
* the output stream to write to
* @return how many bytes were written
* @throws IOException
*/
private GenerateOutput writePieces(long byteAm, long startPos,
DataHasher hasher, OutputStream out) throws IOException {
if (byteAm <= 0) {
return new GenerateOutput(0, 0);
}
if (startPos < 0) {
startPos = 0;
}
int leftOver = (int) (byteAm % bufferSize);
long fullPieces = byteAm / bufferSize;
long offset = startPos;
long bytesWritten = 0;
long timeTaken = 0;
// write the full pieces that fit in the buffer size
for (long i = 0; i < fullPieces; ++i) {
GenerateResult genData = generateFullSegment(bufferSize, offset, hasher);
offset = genData.getOffset();
ByteBuffer gBuf = genData.getBuffer();
{
byte[] buf = gBuf.array();
long startTime = Timer.now();
out.write(buf);
if (Constants.FLUSH_WRITES) {
out.flush();
}
timeTaken += Timer.elapsed(startTime);
bytesWritten += buf.length;
}
}
if (leftOver > 0) {
ByteBuffer leftOverBuf = ByteBuffer.wrap(new byte[leftOver]);
int bytesLeft = leftOver % BYTES_PER_LONG;
leftOver = leftOver - bytesLeft;
// collect the piece which do not fit in the buffer size but is
// also greater or eq than BYTES_PER_LONG and a multiple of it
if (leftOver > 0) {
GenerateResult genData = generateFullSegment(leftOver, offset, hasher);
offset = genData.getOffset();
leftOverBuf.put(genData.getBuffer());
}
// collect any single partial byte segment
if (bytesLeft > 0) {
GenerateResult genData = generatePartialSegment(bytesLeft, offset,
hasher);
offset = genData.getOffset();
leftOverBuf.put(genData.getBuffer());
}
// do the write of both
leftOverBuf.rewind();
{
byte[] buf = leftOverBuf.array();
long startTime = Timer.now();
out.write(buf);
if (Constants.FLUSH_WRITES) {
out.flush();
}
timeTaken += Timer.elapsed(startTime);
bytesWritten += buf.length;
}
}
return new GenerateOutput(bytesWritten, timeTaken);
}
/**
* Writes to a stream the given number of bytes specified
*
* @param byteAm
* the file size in number of bytes to write
*
* @param out
* the outputstream to write to
*
* @return the number of bytes written + time taken
*
* @throws IOException
*/
GenerateOutput writeSegment(long byteAm, OutputStream out)
throws IOException {
long headerLen = getHeaderLength();
if (byteAm < headerLen) {
// not enough bytes to write even the header
return new GenerateOutput(0, 0);
}
// adjust for header length
byteAm -= headerLen;
if (byteAm < 0) {
byteAm = 0;
}
WriteInfo header = writeHeader(out, byteAm);
DataHasher hasher = new DataHasher(header.getHashValue());
GenerateOutput pRes = writePieces(byteAm, 0, hasher, out);
long bytesWritten = pRes.getBytesWritten() + header.getBytesWritten();
long timeTaken = header.getTimeTaken() + pRes.getTimeTaken();
return new GenerateOutput(bytesWritten, timeTaken);
}
/**
* Gets the header length
*
* @return int
*/
static int getHeaderLength() {
return HEADER_LENGTH;
}
/**
* Writes a header to the given output stream
*
* @param os
* output stream to write to
*
* @param fileSize
* the file size to write
*
* @return WriteInfo
*
* @throws IOException
* if a write failure occurs
*/
WriteInfo writeHeader(OutputStream os, long fileSize) throws IOException {
int headerLen = getHeaderLength();
ByteBuffer buf = ByteBuffer.wrap(new byte[headerLen]);
long hash = rnd.nextLong();
buf.putLong(hash);
buf.putLong(fileSize);
buf.rewind();
byte[] headerData = buf.array();
long elapsed = 0;
{
long startTime = Timer.now();
os.write(headerData);
if (Constants.FLUSH_WRITES) {
os.flush();
}
elapsed += Timer.elapsed(startTime);
}
return new WriteInfo(hash, headerLen, elapsed);
}
}