blob: f6cfd579b0183a32c3b6b917b12a0dc224a0b638 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.thirdparty.orc;
import java.io.IOException;
import java.nio.ByteBuffer;
class OutStream extends PositionedOutputStream {
interface OutputReceiver {
/**
* Output the given buffer to the final destination
* @param buffer the buffer to output
* @throws IOException
*/
void output(ByteBuffer buffer) throws IOException;
}
static final int HEADER_SIZE = 3;
private final String name;
private final OutputReceiver receiver;
// if enabled the stream will be suppressed when writing stripe
private boolean suppress;
/**
* Stores the uncompressed bytes that have been serialized, but not
* compressed yet. When this fills, we compress the entire buffer.
*/
private ByteBuffer current = null;
/**
* Stores the compressed bytes until we have a full buffer and then outputs
* them to the receiver. If no compression is being done, this (and overflow)
* will always be null and the current buffer will be sent directly to the
* receiver.
*/
private ByteBuffer compressed = null;
/**
* Since the compressed buffer may start with contents from previous
* compression blocks, we allocate an overflow buffer so that the
* output of the codec can be split between the two buffers. After the
* compressed buffer is sent to the receiver, the overflow buffer becomes
* the new compressed buffer.
*/
private ByteBuffer overflow = null;
private final int bufferSize;
private final CompressionCodec codec;
private long compressedBytes = 0;
private long uncompressedBytes = 0;
OutStream(String name,
int bufferSize,
CompressionCodec codec,
OutputReceiver receiver) throws IOException {
this.name = name;
this.bufferSize = bufferSize;
this.codec = codec;
this.receiver = receiver;
this.suppress = false;
}
public void clear() throws IOException {
flush();
suppress = false;
}
/**
* Write the length of the compressed bytes. Life is much easier if the
* header is constant length, so just use 3 bytes. Considering most of the
* codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should
* be plenty. We also use the low bit for whether it is the original or
* compressed bytes.
* @param buffer the buffer to write the header to
* @param position the position in the buffer to write at
* @param val the size in the file
* @param original is it uncompressed
*/
private static void writeHeader(ByteBuffer buffer,
int position,
int val,
boolean original) {
buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0)));
buffer.put(position + 1, (byte) (val >> 7));
buffer.put(position + 2, (byte) (val >> 15));
}
private void getNewInputBuffer() throws IOException {
if (codec == null) {
current = ByteBuffer.allocate(bufferSize);
} else {
current = ByteBuffer.allocate(bufferSize + HEADER_SIZE);
writeHeader(current, 0, bufferSize, true);
current.position(HEADER_SIZE);
}
}
/**
* Allocate a new output buffer if we are compressing.
*/
private ByteBuffer getNewOutputBuffer() throws IOException {
return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
}
private void flip() throws IOException {
current.limit(current.position());
current.position(codec == null ? 0 : HEADER_SIZE);
}
@Override
public void write(int i) throws IOException {
if (current == null) {
getNewInputBuffer();
}
if (current.remaining() < 1) {
spill();
}
uncompressedBytes += 1;
current.put((byte) i);
}
@Override
public void write(byte[] bytes, int offset, int length) throws IOException {
if (current == null) {
getNewInputBuffer();
}
int remaining = Math.min(current.remaining(), length);
current.put(bytes, offset, remaining);
uncompressedBytes += remaining;
length -= remaining;
while (length != 0) {
spill();
offset += remaining;
remaining = Math.min(current.remaining(), length);
current.put(bytes, offset, remaining);
uncompressedBytes += remaining;
length -= remaining;
}
}
private void spill() throws IOException {
// if there isn't anything in the current buffer, don't spill
if (current == null ||
current.position() == (codec == null ? 0 : HEADER_SIZE)) {
return;
}
flip();
if (codec == null) {
receiver.output(current);
getNewInputBuffer();
} else {
if (compressed == null) {
compressed = getNewOutputBuffer();
} else if (overflow == null) {
overflow = getNewOutputBuffer();
}
int sizePosn = compressed.position();
compressed.position(compressed.position() + HEADER_SIZE);
if (codec.compress(current, compressed, overflow)) {
uncompressedBytes = 0;
// move position back to after the header
current.position(HEADER_SIZE);
current.limit(current.capacity());
// find the total bytes in the chunk
int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
if (overflow != null) {
totalBytes += overflow.position();
}
compressedBytes += totalBytes + HEADER_SIZE;
writeHeader(compressed, sizePosn, totalBytes, false);
// if we have less than the next header left, spill it.
if (compressed.remaining() < HEADER_SIZE) {
compressed.flip();
receiver.output(compressed);
compressed = overflow;
overflow = null;
}
} else {
compressedBytes += uncompressedBytes + HEADER_SIZE;
uncompressedBytes = 0;
// we are using the original, but need to spill the current
// compressed buffer first. So back up to where we started,
// flip it and add it to done.
if (sizePosn != 0) {
compressed.position(sizePosn);
compressed.flip();
receiver.output(compressed);
compressed = null;
// if we have an overflow, clear it and make it the new compress
// buffer
if (overflow != null) {
overflow.clear();
compressed = overflow;
overflow = null;
}
} else {
compressed.clear();
if (overflow != null) {
overflow.clear();
}
}
// now add the current buffer into the done list and get a new one.
current.position(0);
// update the header with the current length
writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
receiver.output(current);
getNewInputBuffer();
}
}
}
void getPosition(PositionRecorder recorder) throws IOException {
if (codec == null) {
recorder.addPosition(uncompressedBytes);
} else {
recorder.addPosition(compressedBytes);
recorder.addPosition(uncompressedBytes);
}
}
@Override
public void flush() throws IOException {
spill();
if (compressed != null && compressed.position() != 0) {
compressed.flip();
receiver.output(compressed);
compressed = null;
}
uncompressedBytes = 0;
compressedBytes = 0;
overflow = null;
current = null;
}
@Override
public String toString() {
return name;
}
@Override
public long getBufferSize() {
long result = 0;
if (current != null) {
result += current.capacity();
}
if (compressed != null) {
result += compressed.capacity();
}
if (overflow != null) {
result += overflow.capacity();
}
return result;
}
/**
* Set suppress flag
*/
public void suppress() {
suppress = true;
}
/**
* Returns the state of suppress flag
* @return value of suppress flag
*/
public boolean isSuppressed() {
return suppress;
}
}