| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.orc.impl; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TreeMap; |
| |
| import com.google.protobuf.ByteString; |
| import com.google.protobuf.CodedOutputStream; |
| |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.orc.CompressionCodec; |
| import org.apache.orc.EncryptionVariant; |
| import org.apache.orc.OrcFile; |
| import org.apache.orc.OrcProto; |
| import org.apache.orc.PhysicalWriter; |
| import org.apache.orc.TypeDescription; |
| import org.apache.orc.impl.writer.WriterEncryptionKey; |
| import org.apache.orc.impl.writer.WriterEncryptionVariant; |
| import org.apache.orc.impl.writer.StreamOptions; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class PhysicalFsWriter implements PhysicalWriter { |
| private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class); |
| |
| private static final int HDFS_BUFFER_SIZE = 256 * 1024; |
| |
| private FSDataOutputStream rawWriter; |
| private final DirectStream rawStream; |
| |
| // the compressed metadata information outStream |
| private OutStream compressStream; |
| // a protobuf outStream around streamFactory |
| private CodedOutputStream codedCompressStream; |
| |
| private final Path path; |
| private final HadoopShims shims; |
| private final long blockSize; |
| private final int maxPadding; |
| private final StreamOptions compress; |
| private final OrcFile.CompressionStrategy compressionStrategy; |
| private final boolean addBlockPadding; |
| private final boolean writeVariableLengthBlocks; |
| private final VariantTracker unencrypted; |
| |
| private long headerLength; |
| private long stripeStart; |
| // The position of the last time we wrote a short block, which becomes the |
| // natural blocks |
| private long blockOffset; |
| private int metadataLength; |
| private int stripeStatisticsLength = 0; |
| private int footerLength; |
| private int stripeNumber = 0; |
| |
| private final Map<WriterEncryptionVariant, VariantTracker> variants = new TreeMap<>(); |
| |
| public PhysicalFsWriter(FileSystem fs, |
| Path path, |
| OrcFile.WriterOptions opts |
| ) throws IOException { |
| this(fs, path, opts, new WriterEncryptionVariant[0]); |
| } |
| |
| public PhysicalFsWriter(FileSystem fs, |
| Path path, |
| OrcFile.WriterOptions opts, |
| WriterEncryptionVariant[] encryption |
| ) throws IOException { |
| this.path = path; |
| long defaultStripeSize = opts.getStripeSize(); |
| this.addBlockPadding = opts.getBlockPadding(); |
| if (opts.isEnforceBufferSize()) { |
| this.compress = new StreamOptions(opts.getBufferSize()); |
| } else { |
| this.compress = new StreamOptions( |
| WriterImpl.getEstimatedBufferSize(defaultStripeSize, |
| opts.getSchema().getMaximumId() + 1, |
| opts.getBufferSize())); |
| } |
| CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress()); |
| if (codec != null){ |
| compress.withCodec(codec, codec.getDefaultOptions()); |
| } |
| this.compressionStrategy = opts.getCompressionStrategy(); |
| this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize); |
| this.blockSize = opts.getBlockSize(); |
| LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" + |
| " compression: {}", path, defaultStripeSize, blockSize, compress); |
| rawWriter = fs.create(path, opts.getOverwrite(), HDFS_BUFFER_SIZE, |
| fs.getDefaultReplication(path), blockSize); |
| blockOffset = 0; |
| unencrypted = new VariantTracker(opts.getSchema(), compress); |
| writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks(); |
| shims = opts.getHadoopShims(); |
| rawStream = new DirectStream(rawWriter); |
| compressStream = new OutStream("stripe footer", compress, rawStream); |
| codedCompressStream = CodedOutputStream.newInstance(compressStream); |
| for(WriterEncryptionVariant variant: encryption) { |
| WriterEncryptionKey key = variant.getKeyDescription(); |
| StreamOptions encryptOptions = |
| new StreamOptions(unencrypted.options) |
| .withEncryption(key.getAlgorithm(), variant.getFileFooterKey()); |
| variants.put(variant, new VariantTracker(variant.getRoot(), encryptOptions)); |
| } |
| } |
| |
| /** |
| * Record the information about each column encryption variant. |
| * The unencrypted data and each encrypted column root are variants. |
| */ |
| protected static class VariantTracker { |
| // the streams that make up the current stripe |
| protected final Map<StreamName, BufferedStream> streams = new TreeMap<>(); |
| private final int rootColumn; |
| private final int lastColumn; |
| protected final StreamOptions options; |
| // a list for each column covered by this variant |
| // the elements in the list correspond to each stripe in the file |
| protected final List<OrcProto.ColumnStatistics>[] stripeStats; |
| protected final List<OrcProto.Stream> stripeStatsStreams = new ArrayList<>(); |
| protected final OrcProto.ColumnStatistics[] fileStats; |
| |
| VariantTracker(TypeDescription schema, StreamOptions options) { |
| rootColumn = schema.getId(); |
| lastColumn = schema.getMaximumId(); |
| this.options = options; |
| stripeStats = new List[schema.getMaximumId() - schema.getId() + 1]; |
| for(int i=0; i < stripeStats.length; ++i) { |
| stripeStats[i] = new ArrayList<>(); |
| } |
| fileStats = new OrcProto.ColumnStatistics[stripeStats.length]; |
| } |
| |
| public BufferedStream createStream(StreamName name) { |
| BufferedStream result = new BufferedStream(); |
| streams.put(name, result); |
| return result; |
| } |
| |
| /** |
| * Place the streams in the appropriate area while updating the sizes |
| * with the number of bytes in the area. |
| * @param area the area to write |
| * @param sizes the sizes of the areas |
| * @return the list of stream descriptions to add |
| */ |
| public List<OrcProto.Stream> placeStreams(StreamName.Area area, |
| SizeCounters sizes) { |
| List<OrcProto.Stream> result = new ArrayList<>(streams.size()); |
| for(Map.Entry<StreamName, BufferedStream> stream: streams.entrySet()) { |
| StreamName name = stream.getKey(); |
| BufferedStream bytes = stream.getValue(); |
| if (name.getArea() == area && !bytes.isSuppressed) { |
| OrcProto.Stream.Builder builder = OrcProto.Stream.newBuilder(); |
| long size = bytes.getOutputSize(); |
| if (area == StreamName.Area.INDEX) { |
| sizes.index += size; |
| } else { |
| sizes.data += size; |
| } |
| builder.setColumn(name.getColumn()) |
| .setKind(name.getKind()) |
| .setLength(size); |
| result.add(builder.build()); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Write the streams in the appropriate area. |
| * @param area the area to write |
| * @param raw the raw stream to write to |
| */ |
| public void writeStreams(StreamName.Area area, |
| FSDataOutputStream raw) throws IOException { |
| for(Map.Entry<StreamName, BufferedStream> stream: streams.entrySet()) { |
| if (stream.getKey().getArea() == area) { |
| stream.getValue().spillToDiskAndClear(raw); |
| } |
| } |
| } |
| |
| /** |
| * Computed the size of the given column on disk for this stripe. |
| * It excludes the index streams. |
| * @param column a column id |
| * @return the total number of bytes |
| */ |
| public long getFileBytes(int column) { |
| long result = 0; |
| if (column >= rootColumn && column <= lastColumn) { |
| for(Map.Entry<StreamName, BufferedStream> entry: streams.entrySet()) { |
| StreamName name = entry.getKey(); |
| if (name.getColumn() == column && |
| name.getArea() != StreamName.Area.INDEX) { |
| result += entry.getValue().getOutputSize(); |
| } |
| } |
| } |
| return result; |
| } |
| } |
| |
| VariantTracker getVariant(EncryptionVariant column) { |
| if (column == null) { |
| return unencrypted; |
| } |
| return variants.get(column); |
| } |
| |
| /** |
| * Get the number of bytes for a file in a given column |
| * by finding all the streams (not suppressed) |
| * for a given column and returning the sum of their sizes. |
| * excludes index |
| * |
| * @param column column from which to get file size |
| * @return number of bytes for the given column |
| */ |
| @Override |
| public long getFileBytes(int column, WriterEncryptionVariant variant) { |
| return getVariant(variant).getFileBytes(column); |
| } |
| |
| @Override |
| public StreamOptions getStreamOptions() { |
| return unencrypted.options; |
| } |
| |
| private static final byte[] ZEROS = new byte[64*1024]; |
| |
| private static void writeZeros(OutputStream output, |
| long remaining) throws IOException { |
| while (remaining > 0) { |
| long size = Math.min(ZEROS.length, remaining); |
| output.write(ZEROS, 0, (int) size); |
| remaining -= size; |
| } |
| } |
| |
| /** |
| * Do any required shortening of the HDFS block or padding to avoid stradling |
| * HDFS blocks. This is called before writing the current stripe. |
| * @param stripeSize the number of bytes in the current stripe |
| */ |
| private void padStripe(long stripeSize) throws IOException { |
| this.stripeStart = rawWriter.getPos(); |
| long previousBytesInBlock = (stripeStart - blockOffset) % blockSize; |
| // We only have options if this isn't the first stripe in the block |
| if (previousBytesInBlock > 0) { |
| if (previousBytesInBlock + stripeSize >= blockSize) { |
| // Try making a short block |
| if (writeVariableLengthBlocks && |
| shims.endVariableLengthBlock(rawWriter)) { |
| blockOffset = stripeStart; |
| } else if (addBlockPadding) { |
| // if we cross the block boundary, figure out what we should do |
| long padding = blockSize - previousBytesInBlock; |
| if (padding <= maxPadding) { |
| writeZeros(rawWriter, padding); |
| stripeStart += padding; |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * An output receiver that writes the ByteBuffers to the output stream |
| * as they are received. |
| */ |
| private static class DirectStream implements OutputReceiver { |
| private final FSDataOutputStream output; |
| |
| DirectStream(FSDataOutputStream output) { |
| this.output = output; |
| } |
| |
| @Override |
| public void output(ByteBuffer buffer) throws IOException { |
| output.write(buffer.array(), buffer.arrayOffset() + buffer.position(), |
| buffer.remaining()); |
| } |
| |
| @Override |
| public void suppress() { |
| throw new UnsupportedOperationException("Can't suppress direct stream"); |
| } |
| } |
| |
| private void writeStripeFooter(OrcProto.StripeFooter footer, |
| SizeCounters sizes, |
| OrcProto.StripeInformation.Builder dirEntry) throws IOException { |
| footer.writeTo(codedCompressStream); |
| codedCompressStream.flush(); |
| compressStream.flush(); |
| dirEntry.setOffset(stripeStart); |
| dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - sizes.total()); |
| } |
| |
| /** |
| * Write the saved encrypted stripe statistic in a variant out to the file. |
| * The streams that are written are added to the tracker.stripeStatsStreams. |
| * @param output the file we are writing to |
| * @param stripeNumber the number of stripes in the file |
| * @param tracker the variant to write out |
| */ |
| static void writeEncryptedStripeStatistics(DirectStream output, |
| int stripeNumber, |
| VariantTracker tracker |
| ) throws IOException { |
| StreamOptions options = new StreamOptions(tracker.options); |
| tracker.stripeStatsStreams.clear(); |
| for(int col = tracker.rootColumn; |
| col < tracker.rootColumn + tracker.stripeStats.length; ++col) { |
| options.modifyIv(CryptoUtils.modifyIvForStream(col, |
| OrcProto.Stream.Kind.STRIPE_STATISTICS, stripeNumber + 1)); |
| OutStream stream = new OutStream("stripe stats for " + col, |
| options, output); |
| OrcProto.ColumnarStripeStatistics stats = |
| OrcProto.ColumnarStripeStatistics.newBuilder() |
| .addAllColStats(tracker.stripeStats[col - tracker.rootColumn]) |
| .build(); |
| long start = output.output.getPos(); |
| stats.writeTo(stream); |
| stream.flush(); |
| OrcProto.Stream description = OrcProto.Stream.newBuilder() |
| .setColumn(col) |
| .setKind(OrcProto.Stream.Kind.STRIPE_STATISTICS) |
| .setLength(output.output.getPos() - start) |
| .build(); |
| tracker.stripeStatsStreams.add(description); |
| } |
| } |
| |
| /** |
| * Merge the saved unencrypted stripe statistics into the Metadata section |
| * of the footer. |
| * @param builder the Metadata section of the file |
| * @param stripeCount the number of stripes in the file |
| * @param stats the stripe statistics |
| */ |
| static void setUnencryptedStripeStatistics(OrcProto.Metadata.Builder builder, |
| int stripeCount, |
| List<OrcProto.ColumnStatistics>[] stats) { |
| // Make the unencrypted stripe stats into lists of StripeStatistics. |
| builder.clearStripeStats(); |
| for(int s=0; s < stripeCount; ++s) { |
| OrcProto.StripeStatistics.Builder stripeStats = |
| OrcProto.StripeStatistics.newBuilder(); |
| for(List<OrcProto.ColumnStatistics> col: stats) { |
| stripeStats.addColStats(col.get(s)); |
| } |
| builder.addStripeStats(stripeStats.build()); |
| } |
| } |
| |
| static void setEncryptionStatistics(OrcProto.Encryption.Builder encryption, |
| int stripeNumber, |
| Collection<VariantTracker> variants |
| ) throws IOException { |
| int v = 0; |
| for(VariantTracker variant: variants) { |
| OrcProto.EncryptionVariant.Builder variantBuilder = |
| encryption.getVariantsBuilder(v++); |
| |
| // Add the stripe statistics streams to the variant description. |
| variantBuilder.clearStripeStatistics(); |
| variantBuilder.addAllStripeStatistics(variant.stripeStatsStreams); |
| |
| // Serialize and encrypt the file statistics. |
| OrcProto.FileStatistics.Builder file = OrcProto.FileStatistics.newBuilder(); |
| for(OrcProto.ColumnStatistics col: variant.fileStats) { |
| file.addColumn(col); |
| } |
| StreamOptions options = new StreamOptions(variant.options); |
| options.modifyIv(CryptoUtils.modifyIvForStream(variant.rootColumn, |
| OrcProto.Stream.Kind.FILE_STATISTICS, stripeNumber + 1)); |
| BufferedStream buffer = new BufferedStream(); |
| OutStream stream = new OutStream("stats for " + variant, options, buffer); |
| file.build().writeTo(stream); |
| stream.flush(); |
| variantBuilder.setFileStatistics(buffer.getBytes()); |
| } |
| } |
| |
| @Override |
| public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException { |
| long stripeStatisticsStart = rawWriter.getPos(); |
| for(VariantTracker variant: variants.values()) { |
| writeEncryptedStripeStatistics(rawStream, stripeNumber, variant); |
| } |
| setUnencryptedStripeStatistics(builder, stripeNumber, |
| unencrypted.stripeStats); |
| long metadataStart = rawWriter.getPos(); |
| builder.build().writeTo(codedCompressStream); |
| codedCompressStream.flush(); |
| compressStream.flush(); |
| this.stripeStatisticsLength = (int) (metadataStart - stripeStatisticsStart); |
| this.metadataLength = (int) (rawWriter.getPos() - metadataStart); |
| } |
| |
| static void addUnencryptedStatistics(OrcProto.Footer.Builder builder, |
| OrcProto.ColumnStatistics[] stats) { |
| for(OrcProto.ColumnStatistics stat: stats) { |
| builder.addStatistics(stat); |
| } |
| } |
| |
| @Override |
| public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException { |
| if (variants.size() > 0) { |
| OrcProto.Encryption.Builder encryption = builder.getEncryptionBuilder(); |
| setEncryptionStatistics(encryption, stripeNumber, variants.values()); |
| } |
| addUnencryptedStatistics(builder, unencrypted.fileStats); |
| long bodyLength = rawWriter.getPos() - metadataLength - stripeStatisticsLength; |
| builder.setContentLength(bodyLength); |
| builder.setHeaderLength(headerLength); |
| long startPosn = rawWriter.getPos(); |
| OrcProto.Footer footer = builder.build(); |
| footer.writeTo(codedCompressStream); |
| codedCompressStream.flush(); |
| compressStream.flush(); |
| this.footerLength = (int) (rawWriter.getPos() - startPosn); |
| } |
| |
| @Override |
| public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException { |
| builder.setFooterLength(footerLength); |
| builder.setMetadataLength(metadataLength); |
| if (variants.size() > 0) { |
| builder.setStripeStatisticsLength(stripeStatisticsLength); |
| } |
| OrcProto.PostScript ps = builder.build(); |
| // need to write this uncompressed |
| long startPosn = rawWriter.getPos(); |
| ps.writeTo(rawWriter); |
| long length = rawWriter.getPos() - startPosn; |
| if (length > 255) { |
| throw new IllegalArgumentException("PostScript too large at " + length); |
| } |
| rawWriter.writeByte((int)length); |
| return rawWriter.getPos(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| // We don't use the codec directly but do give it out codec in getCompressionCodec; |
| // that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that |
| // would get rid of this pattern require cross-project interface changes, so just return the |
| // codec for now. |
| CompressionCodec codec = compress.getCodec(); |
| if (codec != null) { |
| OrcCodecPool.returnCodec(codec.getKind(), codec); |
| } |
| compress.withCodec(null, null); |
| rawWriter.close(); |
| rawWriter = null; |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| rawWriter.hflush(); |
| } |
| |
| @Override |
| public void appendRawStripe(ByteBuffer buffer, |
| OrcProto.StripeInformation.Builder dirEntry) throws IOException { |
| long start = rawWriter.getPos(); |
| int length = buffer.remaining(); |
| long availBlockSpace = blockSize - (start % blockSize); |
| |
| // see if stripe can fit in the current hdfs block, else pad the remaining |
| // space in the block |
| if (length < blockSize && length > availBlockSpace && |
| addBlockPadding) { |
| byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; |
| LOG.info(String.format("Padding ORC by %d bytes while merging..", |
| availBlockSpace)); |
| start += availBlockSpace; |
| while (availBlockSpace > 0) { |
| int writeLen = (int) Math.min(availBlockSpace, pad.length); |
| rawWriter.write(pad, 0, writeLen); |
| availBlockSpace -= writeLen; |
| } |
| } |
| rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(), |
| length); |
| dirEntry.setOffset(start); |
| stripeNumber += 1; |
| } |
| |
| |
| /** |
| * This class is used to hold the contents of streams as they are buffered. |
| * The TreeWriters write to the outStream and the codec compresses the |
| * data as buffers fill up and stores them in the output list. When the |
| * stripe is being written, the whole stream is written to the file. |
| */ |
| static final class BufferedStream implements OutputReceiver { |
| private boolean isSuppressed = false; |
| private final List<ByteBuffer> output = new ArrayList<>(); |
| |
| @Override |
| public void output(ByteBuffer buffer) { |
| if (!isSuppressed) { |
| output.add(buffer); |
| } |
| } |
| |
| @Override |
| public void suppress() { |
| isSuppressed = true; |
| output.clear(); |
| } |
| |
| /** |
| * Write any saved buffers to the OutputStream if needed, and clears all the |
| * buffers. |
| * @return true if the stream was written |
| */ |
| boolean spillToDiskAndClear(FSDataOutputStream raw) throws IOException { |
| if (!isSuppressed) { |
| for (ByteBuffer buffer: output) { |
| raw.write(buffer.array(), buffer.arrayOffset() + buffer.position(), |
| buffer.remaining()); |
| } |
| output.clear(); |
| return true; |
| } |
| isSuppressed = false; |
| return false; |
| } |
| |
| /** |
| * Get the buffer as a protobuf ByteString and clears the BufferedStream. |
| * @return the bytes |
| */ |
| ByteString getBytes() { |
| int len = output.size(); |
| if (len == 0) { |
| return ByteString.EMPTY; |
| } else { |
| ByteString result = ByteString.copyFrom(output.get(0)); |
| for (int i=1; i < output.size(); ++i) { |
| result = result.concat(ByteString.copyFrom(output.get(i))); |
| } |
| output.clear(); |
| return result; |
| } |
| } |
| |
| /** |
| * Get the stream as a ByteBuffer and clear it. |
| * @return a single ByteBuffer with the contents of the stream |
| */ |
| ByteBuffer getByteBuffer() { |
| ByteBuffer result; |
| if (output.size() == 1) { |
| result = output.get(0); |
| } else { |
| result = ByteBuffer.allocate((int) getOutputSize()); |
| for (ByteBuffer buffer : output) { |
| result.put(buffer); |
| } |
| output.clear(); |
| result.flip(); |
| } |
| return result; |
| } |
| |
| /** |
| * Get the number of bytes that will be written to the output. |
| * |
| * Assumes the stream writing into this receiver has already been flushed. |
| * @return number of bytes |
| */ |
| public long getOutputSize() { |
| long result = 0; |
| for (ByteBuffer buffer: output) { |
| result += buffer.remaining(); |
| } |
| return result; |
| } |
| } |
| |
| static class SizeCounters { |
| long index = 0; |
| long data = 0; |
| |
| long total() { |
| return index + data; |
| } |
| } |
| |
| void buildStreamList(OrcProto.StripeFooter.Builder footerBuilder, |
| SizeCounters sizes |
| ) throws IOException { |
| footerBuilder.addAllStreams( |
| unencrypted.placeStreams(StreamName.Area.INDEX, sizes)); |
| final long unencryptedIndexSize = sizes.index; |
| int v = 0; |
| for (VariantTracker variant: variants.values()) { |
| OrcProto.StripeEncryptionVariant.Builder builder = |
| footerBuilder.getEncryptionBuilder(v++); |
| builder.addAllStreams( |
| variant.placeStreams(StreamName.Area.INDEX, sizes)); |
| } |
| if (sizes.index != unencryptedIndexSize) { |
| // add a placeholder that covers the hole where the encrypted indexes are |
| footerBuilder.addStreams(OrcProto.Stream.newBuilder() |
| .setKind(OrcProto.Stream.Kind.ENCRYPTED_INDEX) |
| .setLength(sizes.index - unencryptedIndexSize)); |
| } |
| footerBuilder.addAllStreams( |
| unencrypted.placeStreams(StreamName.Area.DATA, sizes)); |
| final long unencryptedDataSize = sizes.data; |
| v = 0; |
| for (VariantTracker variant: variants.values()) { |
| OrcProto.StripeEncryptionVariant.Builder builder = |
| footerBuilder.getEncryptionBuilder(v++); |
| builder.addAllStreams( |
| variant.placeStreams(StreamName.Area.DATA, sizes)); |
| } |
| if (sizes.data != unencryptedDataSize) { |
| // add a placeholder that covers the hole where the encrypted indexes are |
| footerBuilder.addStreams(OrcProto.Stream.newBuilder() |
| .setKind(OrcProto.Stream.Kind.ENCRYPTED_DATA) |
| .setLength(sizes.data - unencryptedDataSize)); |
| } |
| } |
| |
| @Override |
| public void finalizeStripe(OrcProto.StripeFooter.Builder footerBuilder, |
| OrcProto.StripeInformation.Builder dirEntry |
| ) throws IOException { |
| SizeCounters sizes = new SizeCounters(); |
| buildStreamList(footerBuilder, sizes); |
| |
| OrcProto.StripeFooter footer = footerBuilder.build(); |
| |
| // Do we need to pad the file so the stripe doesn't straddle a block boundary? |
| padStripe(sizes.total() + footer.getSerializedSize()); |
| |
| // write the unencrypted index streams |
| unencrypted.writeStreams(StreamName.Area.INDEX, rawWriter); |
| // write the encrypted index streams |
| for (VariantTracker variant: variants.values()) { |
| variant.writeStreams(StreamName.Area.INDEX, rawWriter); |
| } |
| |
| // write the unencrypted data streams |
| unencrypted.writeStreams(StreamName.Area.DATA, rawWriter); |
| // write out the unencrypted data streams |
| for (VariantTracker variant: variants.values()) { |
| variant.writeStreams(StreamName.Area.DATA, rawWriter); |
| } |
| |
| // Write out the footer. |
| writeStripeFooter(footer, sizes, dirEntry); |
| |
| // fill in the data sizes |
| dirEntry.setDataLength(sizes.data); |
| dirEntry.setIndexLength(sizes.index); |
| |
| stripeNumber += 1; |
| } |
| |
| @Override |
| public void writeHeader() throws IOException { |
| rawWriter.writeBytes(OrcFile.MAGIC); |
| headerLength = rawWriter.getPos(); |
| } |
| |
| @Override |
| public BufferedStream createDataStream(StreamName name) { |
| VariantTracker variant = getVariant(name.getEncryption()); |
| BufferedStream result = variant.streams.get(name); |
| if (result == null) { |
| result = new BufferedStream(); |
| variant.streams.put(name, result); |
| } |
| return result; |
| } |
| |
| private StreamOptions getOptions(OrcProto.Stream.Kind kind) { |
| return SerializationUtils.getCustomizedCodec(compress, compressionStrategy, |
| kind); |
| } |
| |
| protected OutputStream createIndexStream(StreamName name) { |
| BufferedStream buffer = createDataStream(name); |
| VariantTracker tracker = getVariant(name.getEncryption()); |
| StreamOptions options = |
| SerializationUtils.getCustomizedCodec(tracker.options, |
| compressionStrategy, name.getKind()); |
| if (options.isEncrypted()) { |
| if (options == tracker.options) { |
| options = new StreamOptions(options); |
| } |
| options.modifyIv(CryptoUtils.modifyIvForStream(name, stripeNumber + 1)); |
| } |
| return new OutStream(name.toString(), options, buffer); |
| } |
| |
| @Override |
| public void writeIndex(StreamName name, |
| OrcProto.RowIndex.Builder index |
| ) throws IOException { |
| OutputStream stream = createIndexStream(name); |
| index.build().writeTo(stream); |
| stream.flush(); |
| } |
| |
| @Override |
| public void writeBloomFilter(StreamName name, |
| OrcProto.BloomFilterIndex.Builder bloom |
| ) throws IOException { |
| OutputStream stream = createIndexStream(name); |
| bloom.build().writeTo(stream); |
| stream.flush(); |
| } |
| |
| @Override |
| public void writeStatistics(StreamName name, |
| OrcProto.ColumnStatistics.Builder statistics |
| ) { |
| VariantTracker tracker = getVariant(name.getEncryption()); |
| if (name.getKind() == OrcProto.Stream.Kind.FILE_STATISTICS) { |
| tracker.fileStats[name.getColumn() - tracker.rootColumn] = |
| statistics.build(); |
| } else { |
| tracker.stripeStats[name.getColumn() - tracker.rootColumn] |
| .add(statistics.build()); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return path.toString(); |
| } |
| } |