blob: 880e4985bf35fb905838aa3a6eaa2f2039a64c05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.bulkload.pipeline;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.Arrays;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteIllegalStateException;
/**
* A {@link PipelineBlock}, which converts stream of bytes supplied as byte[] arrays to an array of char[] using
* the specified encoding. Decoding errors (malformed input and unmappable characters) are to handled by dropping
* the erroneous input, appending the coder's replacement value to the output buffer, and resuming the coding operation.
*/
public class CharsetDecoderBlock extends PipelineBlock<byte[], char[]> {
/** Empty portion. */
public static final char[] EMPTY_PORTION = new char[0];
/** Charset decoder */
private final CharsetDecoder charsetDecoder;
/** Leftover bytes (partial characters) from the last batch,
* or null if everything was processed. */
private byte[] leftover;
/** True once we've reached the end of input. */
private boolean isEndOfInput;
/**
* Creates charset decoder block.
*
* @param charset The charset encoding to decode bytes from.
*/
public CharsetDecoderBlock(Charset charset) {
charsetDecoder = charset.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE);
isEndOfInput = false;
leftover = null;
}
/** {@inheritDoc} */
@Override public void accept(byte[] data, boolean isLastAppend) throws IgniteCheckedException {
assert nextBlock != null;
assert !isEndOfInput : "convertBytes() called after end of input";
isEndOfInput = isLastAppend;
if (leftover == null && data.length == 0) {
nextBlock.accept(EMPTY_PORTION, isLastAppend);
return;
}
ByteBuffer dataBuf;
if (leftover == null)
dataBuf = ByteBuffer.wrap(data);
else {
dataBuf = ByteBuffer.allocate(leftover.length + data.length);
dataBuf.put(leftover).put(data);
dataBuf.flip();
leftover = null;
}
int outBufLen = (int)Math.ceil(charsetDecoder.maxCharsPerByte() * (data.length + 1));
assert outBufLen > 0;
CharBuffer outBuf = CharBuffer.allocate(outBufLen);
for (;;) {
CoderResult res = charsetDecoder.decode(dataBuf, outBuf, isEndOfInput);
if (res.isUnderflow()) {
// End of input buffer reached. Either skip the partial character at the end or wait for the next batch.
if (!isEndOfInput && dataBuf.remaining() > 0)
leftover = Arrays.copyOfRange(dataBuf.array(),
dataBuf.arrayOffset() + dataBuf.position(), dataBuf.limit());
// See {@link CharsetDecoder} class javadoc for the protocol.
if (isEndOfInput)
charsetDecoder.flush(outBuf);
if (outBuf.position() > 0)
nextBlock.accept(Arrays.copyOfRange(outBuf.array(), outBuf.arrayOffset(), outBuf.position()),
isEndOfInput);
break;
}
// Not enough space in the output buffer, flush it and retry.
if (res.isOverflow()) {
assert outBuf.position() > 0;
nextBlock.accept(Arrays.copyOfRange(outBuf.array(), outBuf.arrayOffset(), outBuf.position()),
isEndOfInput);
outBuf.flip();
continue;
}
assert !res.isMalformed() && !res.isUnmappable();
// We're not supposed to reach this point with the current implementation.
// The code below will fire exception if Oracle implementation of CharsetDecoder will be changed in future.
throw new IgniteIllegalStateException("Unknown CharsetDecoder state");
}
}
}