blob: b0163a4458f15fa455718edf3626d669e2d052f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.orc.impl;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.orc.CompressionCodec;
import org.apache.orc.EncryptionVariant;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
import org.apache.orc.PhysicalWriter;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.writer.WriterEncryptionKey;
import org.apache.orc.impl.writer.WriterEncryptionVariant;
import org.apache.orc.impl.writer.StreamOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PhysicalFsWriter implements PhysicalWriter {
private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class);
private static final int HDFS_BUFFER_SIZE = 256 * 1024;
private FSDataOutputStream rawWriter;
private final DirectStream rawStream;
// the compressed metadata information outStream
private OutStream compressStream;
// a protobuf outStream around streamFactory
private CodedOutputStream codedCompressStream;
private final Path path;
private final HadoopShims shims;
private final long blockSize;
private final int maxPadding;
private final StreamOptions compress;
private final OrcFile.CompressionStrategy compressionStrategy;
private final boolean addBlockPadding;
private final boolean writeVariableLengthBlocks;
private final VariantTracker unencrypted;
private long headerLength;
private long stripeStart;
// The position of the last time we wrote a short block, which becomes the
// natural blocks
private long blockOffset;
private int metadataLength;
private int stripeStatisticsLength = 0;
private int footerLength;
private int stripeNumber = 0;
private final Map<WriterEncryptionVariant, VariantTracker> variants = new TreeMap<>();
public PhysicalFsWriter(FileSystem fs,
Path path,
OrcFile.WriterOptions opts
) throws IOException {
this(fs, path, opts, new WriterEncryptionVariant[0]);
}
public PhysicalFsWriter(FileSystem fs,
Path path,
OrcFile.WriterOptions opts,
WriterEncryptionVariant[] encryption
) throws IOException {
this.path = path;
long defaultStripeSize = opts.getStripeSize();
this.addBlockPadding = opts.getBlockPadding();
if (opts.isEnforceBufferSize()) {
this.compress = new StreamOptions(opts.getBufferSize());
} else {
this.compress = new StreamOptions(
WriterImpl.getEstimatedBufferSize(defaultStripeSize,
opts.getSchema().getMaximumId() + 1,
opts.getBufferSize()));
}
CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress());
if (codec != null){
compress.withCodec(codec, codec.getDefaultOptions());
}
this.compressionStrategy = opts.getCompressionStrategy();
this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
this.blockSize = opts.getBlockSize();
LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
" compression: {}", path, defaultStripeSize, blockSize, compress);
rawWriter = fs.create(path, opts.getOverwrite(), HDFS_BUFFER_SIZE,
fs.getDefaultReplication(path), blockSize);
blockOffset = 0;
unencrypted = new VariantTracker(opts.getSchema(), compress);
writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
shims = opts.getHadoopShims();
rawStream = new DirectStream(rawWriter);
compressStream = new OutStream("stripe footer", compress, rawStream);
codedCompressStream = CodedOutputStream.newInstance(compressStream);
for(WriterEncryptionVariant variant: encryption) {
WriterEncryptionKey key = variant.getKeyDescription();
StreamOptions encryptOptions =
new StreamOptions(unencrypted.options)
.withEncryption(key.getAlgorithm(), variant.getFileFooterKey());
variants.put(variant, new VariantTracker(variant.getRoot(), encryptOptions));
}
}
/**
* Record the information about each column encryption variant.
* The unencrypted data and each encrypted column root are variants.
*/
protected static class VariantTracker {
// the streams that make up the current stripe
protected final Map<StreamName, BufferedStream> streams = new TreeMap<>();
private final int rootColumn;
private final int lastColumn;
protected final StreamOptions options;
// a list for each column covered by this variant
// the elements in the list correspond to each stripe in the file
protected final List<OrcProto.ColumnStatistics>[] stripeStats;
protected final List<OrcProto.Stream> stripeStatsStreams = new ArrayList<>();
protected final OrcProto.ColumnStatistics[] fileStats;
VariantTracker(TypeDescription schema, StreamOptions options) {
rootColumn = schema.getId();
lastColumn = schema.getMaximumId();
this.options = options;
stripeStats = new List[schema.getMaximumId() - schema.getId() + 1];
for(int i=0; i < stripeStats.length; ++i) {
stripeStats[i] = new ArrayList<>();
}
fileStats = new OrcProto.ColumnStatistics[stripeStats.length];
}
public BufferedStream createStream(StreamName name) {
BufferedStream result = new BufferedStream();
streams.put(name, result);
return result;
}
/**
* Place the streams in the appropriate area while updating the sizes
* with the number of bytes in the area.
* @param area the area to write
* @param sizes the sizes of the areas
* @return the list of stream descriptions to add
*/
public List<OrcProto.Stream> placeStreams(StreamName.Area area,
SizeCounters sizes) {
List<OrcProto.Stream> result = new ArrayList<>(streams.size());
for(Map.Entry<StreamName, BufferedStream> stream: streams.entrySet()) {
StreamName name = stream.getKey();
BufferedStream bytes = stream.getValue();
if (name.getArea() == area && !bytes.isSuppressed) {
OrcProto.Stream.Builder builder = OrcProto.Stream.newBuilder();
long size = bytes.getOutputSize();
if (area == StreamName.Area.INDEX) {
sizes.index += size;
} else {
sizes.data += size;
}
builder.setColumn(name.getColumn())
.setKind(name.getKind())
.setLength(size);
result.add(builder.build());
}
}
return result;
}
/**
* Write the streams in the appropriate area.
* @param area the area to write
* @param raw the raw stream to write to
*/
public void writeStreams(StreamName.Area area,
FSDataOutputStream raw) throws IOException {
for(Map.Entry<StreamName, BufferedStream> stream: streams.entrySet()) {
if (stream.getKey().getArea() == area) {
stream.getValue().spillToDiskAndClear(raw);
}
}
}
/**
* Computed the size of the given column on disk for this stripe.
* It excludes the index streams.
* @param column a column id
* @return the total number of bytes
*/
public long getFileBytes(int column) {
long result = 0;
if (column >= rootColumn && column <= lastColumn) {
for(Map.Entry<StreamName, BufferedStream> entry: streams.entrySet()) {
StreamName name = entry.getKey();
if (name.getColumn() == column &&
name.getArea() != StreamName.Area.INDEX) {
result += entry.getValue().getOutputSize();
}
}
}
return result;
}
}
VariantTracker getVariant(EncryptionVariant column) {
if (column == null) {
return unencrypted;
}
return variants.get(column);
}
/**
* Get the number of bytes for a file in a given column
* by finding all the streams (not suppressed)
* for a given column and returning the sum of their sizes.
* excludes index
*
* @param column column from which to get file size
* @return number of bytes for the given column
*/
@Override
public long getFileBytes(int column, WriterEncryptionVariant variant) {
return getVariant(variant).getFileBytes(column);
}
@Override
public StreamOptions getStreamOptions() {
return unencrypted.options;
}
private static final byte[] ZEROS = new byte[64*1024];
private static void writeZeros(OutputStream output,
long remaining) throws IOException {
while (remaining > 0) {
long size = Math.min(ZEROS.length, remaining);
output.write(ZEROS, 0, (int) size);
remaining -= size;
}
}
/**
* Do any required shortening of the HDFS block or padding to avoid stradling
* HDFS blocks. This is called before writing the current stripe.
* @param stripeSize the number of bytes in the current stripe
*/
private void padStripe(long stripeSize) throws IOException {
this.stripeStart = rawWriter.getPos();
long previousBytesInBlock = (stripeStart - blockOffset) % blockSize;
// We only have options if this isn't the first stripe in the block
if (previousBytesInBlock > 0) {
if (previousBytesInBlock + stripeSize >= blockSize) {
// Try making a short block
if (writeVariableLengthBlocks &&
shims.endVariableLengthBlock(rawWriter)) {
blockOffset = stripeStart;
} else if (addBlockPadding) {
// if we cross the block boundary, figure out what we should do
long padding = blockSize - previousBytesInBlock;
if (padding <= maxPadding) {
writeZeros(rawWriter, padding);
stripeStart += padding;
}
}
}
}
}
/**
* An output receiver that writes the ByteBuffers to the output stream
* as they are received.
*/
private static class DirectStream implements OutputReceiver {
private final FSDataOutputStream output;
DirectStream(FSDataOutputStream output) {
this.output = output;
}
@Override
public void output(ByteBuffer buffer) throws IOException {
output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
buffer.remaining());
}
@Override
public void suppress() {
throw new UnsupportedOperationException("Can't suppress direct stream");
}
}
private void writeStripeFooter(OrcProto.StripeFooter footer,
SizeCounters sizes,
OrcProto.StripeInformation.Builder dirEntry) throws IOException {
footer.writeTo(codedCompressStream);
codedCompressStream.flush();
compressStream.flush();
dirEntry.setOffset(stripeStart);
dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - sizes.total());
}
/**
* Write the saved encrypted stripe statistic in a variant out to the file.
* The streams that are written are added to the tracker.stripeStatsStreams.
* @param output the file we are writing to
* @param stripeNumber the number of stripes in the file
* @param tracker the variant to write out
*/
static void writeEncryptedStripeStatistics(DirectStream output,
int stripeNumber,
VariantTracker tracker
) throws IOException {
StreamOptions options = new StreamOptions(tracker.options);
tracker.stripeStatsStreams.clear();
for(int col = tracker.rootColumn;
col < tracker.rootColumn + tracker.stripeStats.length; ++col) {
options.modifyIv(CryptoUtils.modifyIvForStream(col,
OrcProto.Stream.Kind.STRIPE_STATISTICS, stripeNumber + 1));
OutStream stream = new OutStream("stripe stats for " + col,
options, output);
OrcProto.ColumnarStripeStatistics stats =
OrcProto.ColumnarStripeStatistics.newBuilder()
.addAllColStats(tracker.stripeStats[col - tracker.rootColumn])
.build();
long start = output.output.getPos();
stats.writeTo(stream);
stream.flush();
OrcProto.Stream description = OrcProto.Stream.newBuilder()
.setColumn(col)
.setKind(OrcProto.Stream.Kind.STRIPE_STATISTICS)
.setLength(output.output.getPos() - start)
.build();
tracker.stripeStatsStreams.add(description);
}
}
/**
* Merge the saved unencrypted stripe statistics into the Metadata section
* of the footer.
* @param builder the Metadata section of the file
* @param stripeCount the number of stripes in the file
* @param stats the stripe statistics
*/
static void setUnencryptedStripeStatistics(OrcProto.Metadata.Builder builder,
int stripeCount,
List<OrcProto.ColumnStatistics>[] stats) {
// Make the unencrypted stripe stats into lists of StripeStatistics.
builder.clearStripeStats();
for(int s=0; s < stripeCount; ++s) {
OrcProto.StripeStatistics.Builder stripeStats =
OrcProto.StripeStatistics.newBuilder();
for(List<OrcProto.ColumnStatistics> col: stats) {
stripeStats.addColStats(col.get(s));
}
builder.addStripeStats(stripeStats.build());
}
}
static void setEncryptionStatistics(OrcProto.Encryption.Builder encryption,
int stripeNumber,
Collection<VariantTracker> variants
) throws IOException {
int v = 0;
for(VariantTracker variant: variants) {
OrcProto.EncryptionVariant.Builder variantBuilder =
encryption.getVariantsBuilder(v++);
// Add the stripe statistics streams to the variant description.
variantBuilder.clearStripeStatistics();
variantBuilder.addAllStripeStatistics(variant.stripeStatsStreams);
// Serialize and encrypt the file statistics.
OrcProto.FileStatistics.Builder file = OrcProto.FileStatistics.newBuilder();
for(OrcProto.ColumnStatistics col: variant.fileStats) {
file.addColumn(col);
}
StreamOptions options = new StreamOptions(variant.options);
options.modifyIv(CryptoUtils.modifyIvForStream(variant.rootColumn,
OrcProto.Stream.Kind.FILE_STATISTICS, stripeNumber + 1));
BufferedStream buffer = new BufferedStream();
OutStream stream = new OutStream("stats for " + variant, options, buffer);
file.build().writeTo(stream);
stream.flush();
variantBuilder.setFileStatistics(buffer.getBytes());
}
}
@Override
public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException {
long stripeStatisticsStart = rawWriter.getPos();
for(VariantTracker variant: variants.values()) {
writeEncryptedStripeStatistics(rawStream, stripeNumber, variant);
}
setUnencryptedStripeStatistics(builder, stripeNumber,
unencrypted.stripeStats);
long metadataStart = rawWriter.getPos();
builder.build().writeTo(codedCompressStream);
codedCompressStream.flush();
compressStream.flush();
this.stripeStatisticsLength = (int) (metadataStart - stripeStatisticsStart);
this.metadataLength = (int) (rawWriter.getPos() - metadataStart);
}
static void addUnencryptedStatistics(OrcProto.Footer.Builder builder,
OrcProto.ColumnStatistics[] stats) {
for(OrcProto.ColumnStatistics stat: stats) {
builder.addStatistics(stat);
}
}
@Override
public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException {
if (variants.size() > 0) {
OrcProto.Encryption.Builder encryption = builder.getEncryptionBuilder();
setEncryptionStatistics(encryption, stripeNumber, variants.values());
}
addUnencryptedStatistics(builder, unencrypted.fileStats);
long bodyLength = rawWriter.getPos() - metadataLength - stripeStatisticsLength;
builder.setContentLength(bodyLength);
builder.setHeaderLength(headerLength);
long startPosn = rawWriter.getPos();
OrcProto.Footer footer = builder.build();
footer.writeTo(codedCompressStream);
codedCompressStream.flush();
compressStream.flush();
this.footerLength = (int) (rawWriter.getPos() - startPosn);
}
@Override
public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
builder.setFooterLength(footerLength);
builder.setMetadataLength(metadataLength);
if (variants.size() > 0) {
builder.setStripeStatisticsLength(stripeStatisticsLength);
}
OrcProto.PostScript ps = builder.build();
// need to write this uncompressed
long startPosn = rawWriter.getPos();
ps.writeTo(rawWriter);
long length = rawWriter.getPos() - startPosn;
if (length > 255) {
throw new IllegalArgumentException("PostScript too large at " + length);
}
rawWriter.writeByte((int)length);
return rawWriter.getPos();
}
@Override
public void close() throws IOException {
// We don't use the codec directly but do give it out codec in getCompressionCodec;
// that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that
// would get rid of this pattern require cross-project interface changes, so just return the
// codec for now.
CompressionCodec codec = compress.getCodec();
if (codec != null) {
OrcCodecPool.returnCodec(codec.getKind(), codec);
}
compress.withCodec(null, null);
rawWriter.close();
rawWriter = null;
}
@Override
public void flush() throws IOException {
rawWriter.hflush();
}
@Override
public void appendRawStripe(ByteBuffer buffer,
OrcProto.StripeInformation.Builder dirEntry) throws IOException {
long start = rawWriter.getPos();
int length = buffer.remaining();
long availBlockSpace = blockSize - (start % blockSize);
// see if stripe can fit in the current hdfs block, else pad the remaining
// space in the block
if (length < blockSize && length > availBlockSpace &&
addBlockPadding) {
byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
LOG.info(String.format("Padding ORC by %d bytes while merging..",
availBlockSpace));
start += availBlockSpace;
while (availBlockSpace > 0) {
int writeLen = (int) Math.min(availBlockSpace, pad.length);
rawWriter.write(pad, 0, writeLen);
availBlockSpace -= writeLen;
}
}
rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
length);
dirEntry.setOffset(start);
stripeNumber += 1;
}
/**
* This class is used to hold the contents of streams as they are buffered.
* The TreeWriters write to the outStream and the codec compresses the
* data as buffers fill up and stores them in the output list. When the
* stripe is being written, the whole stream is written to the file.
*/
static final class BufferedStream implements OutputReceiver {
private boolean isSuppressed = false;
private final List<ByteBuffer> output = new ArrayList<>();
@Override
public void output(ByteBuffer buffer) {
if (!isSuppressed) {
output.add(buffer);
}
}
@Override
public void suppress() {
isSuppressed = true;
output.clear();
}
/**
* Write any saved buffers to the OutputStream if needed, and clears all the
* buffers.
* @return true if the stream was written
*/
boolean spillToDiskAndClear(FSDataOutputStream raw) throws IOException {
if (!isSuppressed) {
for (ByteBuffer buffer: output) {
raw.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
buffer.remaining());
}
output.clear();
return true;
}
isSuppressed = false;
return false;
}
/**
* Get the buffer as a protobuf ByteString and clears the BufferedStream.
* @return the bytes
*/
ByteString getBytes() {
int len = output.size();
if (len == 0) {
return ByteString.EMPTY;
} else {
ByteString result = ByteString.copyFrom(output.get(0));
for (int i=1; i < output.size(); ++i) {
result = result.concat(ByteString.copyFrom(output.get(i)));
}
output.clear();
return result;
}
}
/**
* Get the stream as a ByteBuffer and clear it.
* @return a single ByteBuffer with the contents of the stream
*/
ByteBuffer getByteBuffer() {
ByteBuffer result;
if (output.size() == 1) {
result = output.get(0);
} else {
result = ByteBuffer.allocate((int) getOutputSize());
for (ByteBuffer buffer : output) {
result.put(buffer);
}
output.clear();
result.flip();
}
return result;
}
/**
* Get the number of bytes that will be written to the output.
*
* Assumes the stream writing into this receiver has already been flushed.
* @return number of bytes
*/
public long getOutputSize() {
long result = 0;
for (ByteBuffer buffer: output) {
result += buffer.remaining();
}
return result;
}
}
static class SizeCounters {
long index = 0;
long data = 0;
long total() {
return index + data;
}
}
void buildStreamList(OrcProto.StripeFooter.Builder footerBuilder,
SizeCounters sizes
) throws IOException {
footerBuilder.addAllStreams(
unencrypted.placeStreams(StreamName.Area.INDEX, sizes));
final long unencryptedIndexSize = sizes.index;
int v = 0;
for (VariantTracker variant: variants.values()) {
OrcProto.StripeEncryptionVariant.Builder builder =
footerBuilder.getEncryptionBuilder(v++);
builder.addAllStreams(
variant.placeStreams(StreamName.Area.INDEX, sizes));
}
if (sizes.index != unencryptedIndexSize) {
// add a placeholder that covers the hole where the encrypted indexes are
footerBuilder.addStreams(OrcProto.Stream.newBuilder()
.setKind(OrcProto.Stream.Kind.ENCRYPTED_INDEX)
.setLength(sizes.index - unencryptedIndexSize));
}
footerBuilder.addAllStreams(
unencrypted.placeStreams(StreamName.Area.DATA, sizes));
final long unencryptedDataSize = sizes.data;
v = 0;
for (VariantTracker variant: variants.values()) {
OrcProto.StripeEncryptionVariant.Builder builder =
footerBuilder.getEncryptionBuilder(v++);
builder.addAllStreams(
variant.placeStreams(StreamName.Area.DATA, sizes));
}
if (sizes.data != unencryptedDataSize) {
// add a placeholder that covers the hole where the encrypted indexes are
footerBuilder.addStreams(OrcProto.Stream.newBuilder()
.setKind(OrcProto.Stream.Kind.ENCRYPTED_DATA)
.setLength(sizes.data - unencryptedDataSize));
}
}
@Override
public void finalizeStripe(OrcProto.StripeFooter.Builder footerBuilder,
OrcProto.StripeInformation.Builder dirEntry
) throws IOException {
SizeCounters sizes = new SizeCounters();
buildStreamList(footerBuilder, sizes);
OrcProto.StripeFooter footer = footerBuilder.build();
// Do we need to pad the file so the stripe doesn't straddle a block boundary?
padStripe(sizes.total() + footer.getSerializedSize());
// write the unencrypted index streams
unencrypted.writeStreams(StreamName.Area.INDEX, rawWriter);
// write the encrypted index streams
for (VariantTracker variant: variants.values()) {
variant.writeStreams(StreamName.Area.INDEX, rawWriter);
}
// write the unencrypted data streams
unencrypted.writeStreams(StreamName.Area.DATA, rawWriter);
// write out the unencrypted data streams
for (VariantTracker variant: variants.values()) {
variant.writeStreams(StreamName.Area.DATA, rawWriter);
}
// Write out the footer.
writeStripeFooter(footer, sizes, dirEntry);
// fill in the data sizes
dirEntry.setDataLength(sizes.data);
dirEntry.setIndexLength(sizes.index);
stripeNumber += 1;
}
@Override
public void writeHeader() throws IOException {
rawWriter.writeBytes(OrcFile.MAGIC);
headerLength = rawWriter.getPos();
}
@Override
public BufferedStream createDataStream(StreamName name) {
VariantTracker variant = getVariant(name.getEncryption());
BufferedStream result = variant.streams.get(name);
if (result == null) {
result = new BufferedStream();
variant.streams.put(name, result);
}
return result;
}
private StreamOptions getOptions(OrcProto.Stream.Kind kind) {
return SerializationUtils.getCustomizedCodec(compress, compressionStrategy,
kind);
}
protected OutputStream createIndexStream(StreamName name) {
BufferedStream buffer = createDataStream(name);
VariantTracker tracker = getVariant(name.getEncryption());
StreamOptions options =
SerializationUtils.getCustomizedCodec(tracker.options,
compressionStrategy, name.getKind());
if (options.isEncrypted()) {
if (options == tracker.options) {
options = new StreamOptions(options);
}
options.modifyIv(CryptoUtils.modifyIvForStream(name, stripeNumber + 1));
}
return new OutStream(name.toString(), options, buffer);
}
@Override
public void writeIndex(StreamName name,
OrcProto.RowIndex.Builder index
) throws IOException {
OutputStream stream = createIndexStream(name);
index.build().writeTo(stream);
stream.flush();
}
@Override
public void writeBloomFilter(StreamName name,
OrcProto.BloomFilterIndex.Builder bloom
) throws IOException {
OutputStream stream = createIndexStream(name);
bloom.build().writeTo(stream);
stream.flush();
}
@Override
public void writeStatistics(StreamName name,
OrcProto.ColumnStatistics.Builder statistics
) {
VariantTracker tracker = getVariant(name.getEncryption());
if (name.getKind() == OrcProto.Stream.Kind.FILE_STATISTICS) {
tracker.fileStats[name.getColumn() - tracker.rootColumn] =
statistics.build();
} else {
tracker.stripeStats[name.getColumn() - tracker.rootColumn]
.add(statistics.build());
}
}
@Override
public String toString() {
return path.toString();
}
}