blob: 57e6957435ca3272bc23138a9a9e1702d6ddffb9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil;
import java.nio.ByteBuffer;
/**
* A raw erasure decoder in RS code scheme in pure Java in case native one
* isn't available in some environment. Please always use native implementations
* when possible.
*
* Currently this implementation will compute and decode not to read units
* unnecessarily due to the underlying implementation limit in GF. This will be
* addressed in HADOOP-11871.
*/
public class RSRawDecoder extends AbstractRawErasureDecoder {
// To describe and calculate the needed Vandermonde matrix
private int[] errSignature;
private int[] primitivePower;
/**
* We need a set of reusable buffers either for the bytes array
* decoding version or direct buffer decoding version. Normally not both.
*
* For output, in addition to the valid buffers from the caller
* passed from above, we need to provide extra buffers for the internal
* decoding implementation. For output, the caller should provide no more
* than numParityUnits but at least one buffers. And the left buffers will be
* borrowed from either bytesArrayBuffers, for the bytes array version.
*
*/
// Reused buffers for decoding with bytes arrays
private byte[][] bytesArrayBuffers = new byte[getNumParityUnits()][];
private byte[][] adjustedByteArrayOutputsParameter =
new byte[getNumParityUnits()][];
private int[] adjustedOutputOffsets = new int[getNumParityUnits()];
// Reused buffers for decoding with direct ByteBuffers
private ByteBuffer[] directBuffers = new ByteBuffer[getNumParityUnits()];
private ByteBuffer[] adjustedDirectBufferOutputsParameter =
new ByteBuffer[getNumParityUnits()];
public RSRawDecoder(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
if (numDataUnits + numParityUnits >= RSUtil.GF.getFieldSize()) {
throw new HadoopIllegalArgumentException(
"Invalid numDataUnits and numParityUnits");
}
this.errSignature = new int[numParityUnits];
this.primitivePower = RSUtil.getPrimitivePower(numDataUnits,
numParityUnits);
}
private void doDecodeImpl(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
ByteBuffer valid = findFirstValidInput(inputs);
int dataLen = valid.remaining();
for (int i = 0; i < erasedIndexes.length; i++) {
errSignature[i] = primitivePower[erasedIndexes[i]];
RSUtil.GF.substitute(inputs, dataLen, outputs[i], primitivePower[i]);
}
RSUtil.GF.solveVandermondeSystem(errSignature,
outputs, erasedIndexes.length);
}
private void doDecodeImpl(byte[][] inputs, int[] inputOffsets,
int dataLen, int[] erasedIndexes,
byte[][] outputs, int[] outputOffsets) {
for (int i = 0; i < erasedIndexes.length; i++) {
errSignature[i] = primitivePower[erasedIndexes[i]];
RSUtil.GF.substitute(inputs, inputOffsets, dataLen, outputs[i],
outputOffsets[i], primitivePower[i]);
}
RSUtil.GF.solveVandermondeSystem(errSignature, outputs, outputOffsets,
erasedIndexes.length, dataLen);
}
@Override
protected void doDecode(byte[][] inputs, int[] inputOffsets,
int dataLen, int[] erasedIndexes,
byte[][] outputs, int[] outputOffsets) {
/**
* As passed parameters are friendly to callers but not to the underlying
* implementations, so we have to adjust them before calling doDecodeImpl.
*/
int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs);
// Prepare for adjustedOutputsParameter
// First reset the positions needed this time
for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) {
adjustedByteArrayOutputsParameter[i] = null;
adjustedOutputOffsets[i] = 0;
}
// Use the caller passed buffers in erasedIndexes positions
for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) {
boolean found = false;
for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) {
// If this index is one requested by the caller via erasedIndexes, then
// we use the passed output buffer to avoid copying data thereafter.
if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
found = true;
adjustedByteArrayOutputsParameter[j] = resetBuffer(
outputs[outputIdx], outputOffsets[outputIdx], dataLen);
adjustedOutputOffsets[j] = outputOffsets[outputIdx];
outputIdx++;
}
}
if (!found) {
throw new HadoopIllegalArgumentException(
"Inputs not fully corresponding to erasedIndexes in null places");
}
}
// Use shared buffers for other positions (not set yet)
for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) {
if (adjustedByteArrayOutputsParameter[i] == null) {
adjustedByteArrayOutputsParameter[i] = resetBuffer(
checkGetBytesArrayBuffer(bufferIdx, dataLen), 0, dataLen);
adjustedOutputOffsets[i] = 0; // Always 0 for such temp output
bufferIdx++;
}
}
doDecodeImpl(inputs, inputOffsets, dataLen, erasedOrNotToReadIndexes,
adjustedByteArrayOutputsParameter, adjustedOutputOffsets);
}
@Override
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
ByteBuffer validInput = findFirstValidInput(inputs);
int dataLen = validInput.remaining();
/**
* As passed parameters are friendly to callers but not to the underlying
* implementations, so we have to adjust them before calling doDecodeImpl.
*/
int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs);
// Prepare for adjustedDirectBufferOutputsParameter
// First reset the positions needed this time
for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) {
adjustedDirectBufferOutputsParameter[i] = null;
}
// Use the caller passed buffers in erasedIndexes positions
for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) {
boolean found = false;
for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) {
// If this index is one requested by the caller via erasedIndexes, then
// we use the passed output buffer to avoid copying data thereafter.
if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
found = true;
adjustedDirectBufferOutputsParameter[j] =
resetBuffer(outputs[outputIdx++]);
}
}
if (!found) {
throw new HadoopIllegalArgumentException(
"Inputs not fully corresponding to erasedIndexes in null places");
}
}
// Use shared buffers for other positions (not set yet)
for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) {
if (adjustedDirectBufferOutputsParameter[i] == null) {
ByteBuffer buffer = checkGetDirectBuffer(bufferIdx, dataLen);
buffer.position(0);
buffer.limit(dataLen);
adjustedDirectBufferOutputsParameter[i] = resetBuffer(buffer);
bufferIdx++;
}
}
doDecodeImpl(inputs, erasedOrNotToReadIndexes,
adjustedDirectBufferOutputsParameter);
}
private byte[] checkGetBytesArrayBuffer(int idx, int bufferLen) {
if (bytesArrayBuffers[idx] == null ||
bytesArrayBuffers[idx].length < bufferLen) {
bytesArrayBuffers[idx] = new byte[bufferLen];
}
return bytesArrayBuffers[idx];
}
private ByteBuffer checkGetDirectBuffer(int idx, int bufferLen) {
if (directBuffers[idx] == null ||
directBuffers[idx].capacity() < bufferLen) {
directBuffers[idx] = ByteBuffer.allocateDirect(bufferLen);
}
return directBuffers[idx];
}
}