blob: 2cfb57ccb3dfe4a007dafb5c18ac7506fe7a2647 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ECChunk;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* An abstract raw erasure decoder that's to be inherited by new decoders.
*
* It implements the {@link RawErasureDecoder} interface.
*/
@InterfaceAudience.Private
public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
implements RawErasureDecoder {
public AbstractRawErasureDecoder(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
}
@Override
public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
checkParameters(inputs, erasedIndexes, outputs);
ByteBuffer validInput = findFirstValidInput(inputs);
boolean usingDirectBuffer = validInput.isDirect();
int dataLen = validInput.remaining();
if (dataLen == 0) {
return;
}
checkParameterBuffers(inputs, true, dataLen, usingDirectBuffer, false);
checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true);
if (usingDirectBuffer) {
doDecode(inputs, erasedIndexes, outputs);
return;
}
int[] inputOffsets = new int[inputs.length];
int[] outputOffsets = new int[outputs.length];
byte[][] newInputs = new byte[inputs.length][];
byte[][] newOutputs = new byte[outputs.length][];
ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
if (buffer != null) {
inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array();
}
}
for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i];
outputOffsets[i] = buffer.arrayOffset() + buffer.position();
newOutputs[i] = buffer.array();
}
doDecode(newInputs, inputOffsets, dataLen,
erasedIndexes, newOutputs, outputOffsets);
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
if (buffer != null) {
// dataLen bytes consumed
buffer.position(buffer.position() + dataLen);
}
}
}
/**
* Perform the real decoding using Direct ByteBuffer.
* @param inputs Direct ByteBuffers expected
* @param erasedIndexes indexes of erased units in the inputs array
* @param outputs Direct ByteBuffers expected
*/
protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs);
@Override
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
checkParameters(inputs, erasedIndexes, outputs);
byte[] validInput = findFirstValidInput(inputs);
int dataLen = validInput.length;
if (dataLen == 0) {
return;
}
checkParameterBuffers(inputs, true, dataLen, false);
checkParameterBuffers(outputs, false, dataLen, true);
int[] inputOffsets = new int[inputs.length]; // ALL ZERO
int[] outputOffsets = new int[outputs.length]; // ALL ZERO
doDecode(inputs, inputOffsets, dataLen, erasedIndexes, outputs,
outputOffsets);
}
/**
* Perform the real decoding using bytes array, supporting offsets and
* lengths.
* @param inputs the input byte arrays to read data from
* @param inputOffsets offsets for the input byte arrays to read data from
* @param dataLen how much data are to be read from
* @param erasedIndexes indexes of erased units in the inputs array
* @param outputs the output byte arrays to write resultant data into
* @param outputOffsets offsets from which to write resultant data into
*/
protected abstract void doDecode(byte[][] inputs, int[] inputOffsets,
int dataLen, int[] erasedIndexes,
byte[][] outputs, int[] outputOffsets);
@Override
public void decode(ECChunk[] inputs, int[] erasedIndexes,
ECChunk[] outputs) {
ByteBuffer[] newInputs = ECChunk.toBuffers(inputs);
ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs);
decode(newInputs, erasedIndexes, newOutputs);
}
/**
* Check and validate decoding parameters, throw exception accordingly. The
* checking assumes it's a MDS code. Other code can override this.
* @param inputs input buffers to check
* @param erasedIndexes indexes of erased units in the inputs array
* @param outputs output buffers to check
*/
protected <T> void checkParameters(T[] inputs, int[] erasedIndexes,
T[] outputs) {
if (inputs.length != getNumParityUnits() + getNumDataUnits()) {
throw new IllegalArgumentException("Invalid inputs length");
}
if (erasedIndexes.length != outputs.length) {
throw new HadoopIllegalArgumentException(
"erasedIndexes and outputs mismatch in length");
}
if (erasedIndexes.length > getNumParityUnits()) {
throw new HadoopIllegalArgumentException(
"Too many erased, not recoverable");
}
int validInputs = 0;
for (T input : inputs) {
if (input != null) {
validInputs += 1;
}
}
if (validInputs < getNumDataUnits()) {
throw new HadoopIllegalArgumentException(
"No enough valid inputs are provided, not recoverable");
}
}
/**
* Get indexes into inputs array for items marked as null, either erased or
* not to read.
* @return indexes into inputs array
*/
protected <T> int[] getErasedOrNotToReadIndexes(T[] inputs) {
int[] invalidIndexes = new int[inputs.length];
int idx = 0;
for (int i = 0; i < inputs.length; i++) {
if (inputs[i] == null) {
invalidIndexes[idx++] = i;
}
}
return Arrays.copyOf(invalidIndexes, idx);
}
/**
* Find the valid input from all the inputs.
* @param inputs input buffers to look for valid input
* @return the first valid input
*/
protected static <T> T findFirstValidInput(T[] inputs) {
for (T input : inputs) {
if (input != null) {
return input;
}
}
throw new HadoopIllegalArgumentException(
"Invalid inputs are found, all being null");
}
}