blob: 87347c0af1f979122ca4d6fc6f1544a6434da4c3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil;
import java.nio.ByteBuffer;
/**
* A raw erasure decoder in RS code scheme in pure Java in case native one
* isn't available in some environment. Please always use native implementations
* when possible.
*
* Currently this implementation will compute and decode not to read units
* unnecessarily due to the underlying implementation limit in GF. This will be
* addressed in HADOOP-11871.
*/
@InterfaceAudience.Private
public class RSRawDecoder extends AbstractRawErasureDecoder {
// To describe and calculate the needed Vandermonde matrix
private int[] errSignature;
private int[] primitivePower;
/**
* We need a set of reusable buffers either for the bytes array
* decoding version or direct buffer decoding version. Normally not both.
*
* For output, in addition to the valid buffers from the caller
* passed from above, we need to provide extra buffers for the internal
* decoding implementation. For output, the caller should provide no more
* than numParityUnits but at least one buffers. And the left buffers will be
* borrowed from either bytesArrayBuffers, for the bytes array version.
*
*/
// Reused buffers for decoding with bytes arrays
private byte[][] bytesArrayBuffers = new byte[getNumParityUnits()][];
private byte[][] adjustedByteArrayOutputsParameter =
new byte[getNumParityUnits()][];
private int[] adjustedOutputOffsets = new int[getNumParityUnits()];
// Reused buffers for decoding with direct ByteBuffers
private ByteBuffer[] directBuffers = new ByteBuffer[getNumParityUnits()];
private ByteBuffer[] adjustedDirectBufferOutputsParameter =
new ByteBuffer[getNumParityUnits()];
public RSRawDecoder(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
if (numDataUnits + numParityUnits >= RSUtil.GF.getFieldSize()) {
throw new HadoopIllegalArgumentException(
"Invalid numDataUnits and numParityUnits");
}
this.errSignature = new int[numParityUnits];
this.primitivePower = RSUtil.getPrimitivePower(numDataUnits,
numParityUnits);
}
@Override
public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
// Make copies avoiding affecting original ones;
ByteBuffer[] newInputs = new ByteBuffer[inputs.length];
int[] newErasedIndexes = new int[erasedIndexes.length];
ByteBuffer[] newOutputs = new ByteBuffer[outputs.length];
// Adjust the order to match with underlying requirements.
adjustOrder(inputs, newInputs,
erasedIndexes, newErasedIndexes, outputs, newOutputs);
super.decode(newInputs, newErasedIndexes, newOutputs);
}
@Override
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
// Make copies avoiding affecting original ones;
byte[][] newInputs = new byte[inputs.length][];
int[] newErasedIndexes = new int[erasedIndexes.length];
byte[][] newOutputs = new byte[outputs.length][];
// Adjust the order to match with underlying requirements.
adjustOrder(inputs, newInputs,
erasedIndexes, newErasedIndexes, outputs, newOutputs);
super.decode(newInputs, newErasedIndexes, newOutputs);
}
private void doDecodeImpl(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
ByteBuffer valid = findFirstValidInput(inputs);
int dataLen = valid.remaining();
for (int i = 0; i < erasedIndexes.length; i++) {
errSignature[i] = primitivePower[erasedIndexes[i]];
RSUtil.GF.substitute(inputs, dataLen, outputs[i], primitivePower[i]);
}
RSUtil.GF.solveVandermondeSystem(errSignature,
outputs, erasedIndexes.length);
}
private void doDecodeImpl(byte[][] inputs, int[] inputOffsets,
int dataLen, int[] erasedIndexes,
byte[][] outputs, int[] outputOffsets) {
for (int i = 0; i < erasedIndexes.length; i++) {
errSignature[i] = primitivePower[erasedIndexes[i]];
RSUtil.GF.substitute(inputs, inputOffsets, dataLen, outputs[i],
outputOffsets[i], primitivePower[i]);
}
RSUtil.GF.solveVandermondeSystem(errSignature, outputs, outputOffsets,
erasedIndexes.length, dataLen);
}
@Override
protected void doDecode(byte[][] inputs, int[] inputOffsets,
int dataLen, int[] erasedIndexes,
byte[][] outputs, int[] outputOffsets) {
/**
* As passed parameters are friendly to callers but not to the underlying
* implementations, so we have to adjust them before calling doDecodeImpl.
*/
int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs);
// Prepare for adjustedOutputsParameter
// First reset the positions needed this time
for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) {
adjustedByteArrayOutputsParameter[i] = null;
adjustedOutputOffsets[i] = 0;
}
// Use the caller passed buffers in erasedIndexes positions
for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) {
boolean found = false;
for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) {
// If this index is one requested by the caller via erasedIndexes, then
// we use the passed output buffer to avoid copying data thereafter.
if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
found = true;
adjustedByteArrayOutputsParameter[j] = resetBuffer(
outputs[outputIdx], outputOffsets[outputIdx], dataLen);
adjustedOutputOffsets[j] = outputOffsets[outputIdx];
outputIdx++;
}
}
if (!found) {
throw new HadoopIllegalArgumentException(
"Inputs not fully corresponding to erasedIndexes in null places");
}
}
// Use shared buffers for other positions (not set yet)
for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) {
if (adjustedByteArrayOutputsParameter[i] == null) {
adjustedByteArrayOutputsParameter[i] = resetBuffer(
checkGetBytesArrayBuffer(bufferIdx, dataLen), 0, dataLen);
adjustedOutputOffsets[i] = 0; // Always 0 for such temp output
bufferIdx++;
}
}
doDecodeImpl(inputs, inputOffsets, dataLen, erasedOrNotToReadIndexes,
adjustedByteArrayOutputsParameter, adjustedOutputOffsets);
}
@Override
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
ByteBuffer validInput = findFirstValidInput(inputs);
int dataLen = validInput.remaining();
/**
* As passed parameters are friendly to callers but not to the underlying
* implementations, so we have to adjust them before calling doDecodeImpl.
*/
int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs);
// Prepare for adjustedDirectBufferOutputsParameter
// First reset the positions needed this time
for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) {
adjustedDirectBufferOutputsParameter[i] = null;
}
// Use the caller passed buffers in erasedIndexes positions
for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) {
boolean found = false;
for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) {
// If this index is one requested by the caller via erasedIndexes, then
// we use the passed output buffer to avoid copying data thereafter.
if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
found = true;
adjustedDirectBufferOutputsParameter[j] =
resetBuffer(outputs[outputIdx++], dataLen);
}
}
if (!found) {
throw new HadoopIllegalArgumentException(
"Inputs not fully corresponding to erasedIndexes in null places");
}
}
// Use shared buffers for other positions (not set yet)
for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) {
if (adjustedDirectBufferOutputsParameter[i] == null) {
ByteBuffer buffer = checkGetDirectBuffer(bufferIdx, dataLen);
buffer.position(0);
buffer.limit(dataLen);
adjustedDirectBufferOutputsParameter[i] = resetBuffer(buffer, dataLen);
bufferIdx++;
}
}
doDecodeImpl(inputs, erasedOrNotToReadIndexes,
adjustedDirectBufferOutputsParameter);
}
/*
* Convert data units first order to parity units first order.
*/
private <T> void adjustOrder(T[] inputs, T[] inputs2,
int[] erasedIndexes, int[] erasedIndexes2,
T[] outputs, T[] outputs2) {
// Example:
// d0 d1 d2 d3 d4 d5 : p0 p1 p2 => p0 p1 p2 : d0 d1 d2 d3 d4 d5
System.arraycopy(inputs, getNumDataUnits(), inputs2,
0, getNumParityUnits());
System.arraycopy(inputs, 0, inputs2,
getNumParityUnits(), getNumDataUnits());
int numErasedDataUnits = 0, numErasedParityUnits = 0;
int idx = 0;
for (int i = 0; i < erasedIndexes.length; i++) {
if (erasedIndexes[i] >= getNumDataUnits()) {
erasedIndexes2[idx++] = erasedIndexes[i] - getNumDataUnits();
numErasedParityUnits++;
}
}
for (int i = 0; i < erasedIndexes.length; i++) {
if (erasedIndexes[i] < getNumDataUnits()) {
erasedIndexes2[idx++] = erasedIndexes[i] + getNumParityUnits();
numErasedDataUnits++;
}
}
// Copy for data units
System.arraycopy(outputs, numErasedDataUnits, outputs2,
0, numErasedParityUnits);
// Copy for parity units
System.arraycopy(outputs, 0, outputs2,
numErasedParityUnits, numErasedDataUnits);
}
private byte[] checkGetBytesArrayBuffer(int idx, int bufferLen) {
if (bytesArrayBuffers[idx] == null ||
bytesArrayBuffers[idx].length < bufferLen) {
bytesArrayBuffers[idx] = new byte[bufferLen];
}
return bytesArrayBuffers[idx];
}
private ByteBuffer checkGetDirectBuffer(int idx, int bufferLen) {
if (directBuffers[idx] == null ||
directBuffers[idx].capacity() < bufferLen) {
directBuffers[idx] = ByteBuffer.allocateDirect(bufferLen);
}
return directBuffers[idx];
}
}