|  | /** | 
|  | * Licensed to the Apache Software Foundation (ASF) under one | 
|  | * or more contributor license agreements.  See the NOTICE file | 
|  | * distributed with this work for additional information | 
|  | * regarding copyright ownership.  The ASF licenses this file | 
|  | * to you under the Apache License, Version 2.0 (the | 
|  | * "License"); you may not use this file except in compliance | 
|  | * with the License.  You may obtain a copy of the License at | 
|  | * | 
|  | *     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | * Unless required by applicable law or agreed to in writing, software | 
|  | * distributed under the License is distributed on an "AS IS" BASIS, | 
|  | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | * See the License for the specific language governing permissions and | 
|  | * limitations under the License. | 
|  | */ | 
|  |  | 
|  | package org.apache.orc; | 
|  |  | 
|  | import java.io.IOException; | 
|  | import java.util.Properties; | 
|  |  | 
|  | import org.apache.hadoop.conf.Configuration; | 
|  | import org.apache.hadoop.fs.FileSystem; | 
|  | import org.apache.hadoop.fs.Path; | 
|  | import org.apache.orc.impl.MemoryManager; | 
|  | import org.apache.orc.impl.OrcTail; | 
|  | import org.apache.orc.impl.ReaderImpl; | 
|  | import org.apache.orc.impl.WriterImpl; | 
|  |  | 
|  | /** | 
|  | * Contains factory methods to read or write ORC files. | 
|  | */ | 
|  | public class OrcFile { | 
|  | public static final String MAGIC = "ORC"; | 
|  |  | 
|  | /** | 
|  | * Create a version number for the ORC file format, so that we can add | 
|  | * non-forward compatible changes in the future. To make it easier for users | 
|  | * to understand the version numbers, we use the Hive release number that | 
|  | * first wrote that version of ORC files. | 
|  | * | 
|  | * Thus, if you add new encodings or other non-forward compatible changes | 
|  | * to ORC files, which prevent the old reader from reading the new format, | 
|  | * you should change these variable to reflect the next Hive release number. | 
|  | * Non-forward compatible changes should never be added in patch releases. | 
|  | * | 
|  | * Do not make any changes that break backwards compatibility, which would | 
|  | * prevent the new reader from reading ORC files generated by any released | 
|  | * version of Hive. | 
|  | */ | 
|  | public enum Version { | 
|  | V_0_11("0.11", 0, 11), | 
|  | V_0_12("0.12", 0, 12); | 
|  |  | 
|  | public static final Version CURRENT = V_0_12; | 
|  |  | 
|  | private final String name; | 
|  | private final int major; | 
|  | private final int minor; | 
|  |  | 
|  | Version(String name, int major, int minor) { | 
|  | this.name = name; | 
|  | this.major = major; | 
|  | this.minor = minor; | 
|  | } | 
|  |  | 
|  | public static Version byName(String name) { | 
|  | for(Version version: values()) { | 
|  | if (version.name.equals(name)) { | 
|  | return version; | 
|  | } | 
|  | } | 
|  | throw new IllegalArgumentException("Unknown ORC version " + name); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Get the human readable name for the version. | 
|  | */ | 
|  | public String getName() { | 
|  | return name; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Get the major version number. | 
|  | */ | 
|  | public int getMajor() { | 
|  | return major; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Get the minor version number. | 
|  | */ | 
|  | public int getMinor() { | 
|  | return minor; | 
|  | } | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Records the version of the writer in terms of which bugs have been fixed. | 
|  | * For bugs in the writer, but the old readers already read the new data | 
|  | * correctly, bump this version instead of the Version. | 
|  | */ | 
|  | public enum WriterVersion { | 
|  | ORIGINAL(0), | 
|  | HIVE_8732(1), // corrupted stripe/file maximum column statistics | 
|  | HIVE_4243(2), // use real column names from Hive tables | 
|  | HIVE_12055(3), // vectorized writer | 
|  | HIVE_13083(4), // decimal writer updating present stream wrongly | 
|  |  | 
|  | // Don't use any magic numbers here except for the below: | 
|  | FUTURE(Integer.MAX_VALUE); // a version from a future writer | 
|  |  | 
|  | private final int id; | 
|  |  | 
|  | public int getId() { | 
|  | return id; | 
|  | } | 
|  |  | 
|  | WriterVersion(int id) { | 
|  | this.id = id; | 
|  | } | 
|  |  | 
|  | private static final WriterVersion[] values; | 
|  | static { | 
|  | // Assumes few non-negative values close to zero. | 
|  | int max = Integer.MIN_VALUE; | 
|  | for (WriterVersion v : WriterVersion.values()) { | 
|  | if (v.id < 0) throw new AssertionError(); | 
|  | if (v.id > max && FUTURE.id != v.id) { | 
|  | max = v.id; | 
|  | } | 
|  | } | 
|  | values = new WriterVersion[max + 1]; | 
|  | for (WriterVersion v : WriterVersion.values()) { | 
|  | if (v.id < values.length) { | 
|  | values[v.id] = v; | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Convert the integer from OrcProto.PostScript.writerVersion | 
|  | * to the enumeration with unknown versions being mapped to FUTURE. | 
|  | * @param val the serialized writer version | 
|  | * @return the corresponding enumeration value | 
|  | */ | 
|  | public static WriterVersion from(int val) { | 
|  | if (val >= values.length) { | 
|  | return FUTURE; | 
|  | } | 
|  | return values[val]; | 
|  | } | 
|  | } | 
|  | public static final WriterVersion CURRENT_WRITER = WriterVersion.HIVE_13083; | 
|  |  | 
|  | public enum EncodingStrategy { | 
|  | SPEED, COMPRESSION | 
|  | } | 
|  |  | 
|  | public enum CompressionStrategy { | 
|  | SPEED, COMPRESSION | 
|  | } | 
|  |  | 
|  | // unused | 
|  | protected OrcFile() {} | 
|  |  | 
|  | public static class ReaderOptions { | 
|  | private final Configuration conf; | 
|  | private FileSystem filesystem; | 
|  | private long maxLength = Long.MAX_VALUE; | 
|  | private OrcTail orcTail; | 
|  | // TODO: We can generalize FileMetada interface. Make OrcTail implement FileMetadata interface | 
|  | // and remove this class altogether. Both footer caching and llap caching just needs OrcTail. | 
|  | // For now keeping this around to avoid complex surgery | 
|  | private FileMetadata fileMetadata; | 
|  |  | 
|  | public ReaderOptions(Configuration conf) { | 
|  | this.conf = conf; | 
|  | } | 
|  |  | 
|  | public ReaderOptions filesystem(FileSystem fs) { | 
|  | this.filesystem = fs; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | public ReaderOptions maxLength(long val) { | 
|  | maxLength = val; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | public ReaderOptions orcTail(OrcTail tail) { | 
|  | this.orcTail = tail; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | public Configuration getConfiguration() { | 
|  | return conf; | 
|  | } | 
|  |  | 
|  | public FileSystem getFilesystem() { | 
|  | return filesystem; | 
|  | } | 
|  |  | 
|  | public long getMaxLength() { | 
|  | return maxLength; | 
|  | } | 
|  |  | 
|  | public OrcTail getOrcTail() { | 
|  | return orcTail; | 
|  | } | 
|  |  | 
|  | public ReaderOptions fileMetadata(final FileMetadata metadata) { | 
|  | fileMetadata = metadata; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | public FileMetadata getFileMetadata() { | 
|  | return fileMetadata; | 
|  | } | 
|  | } | 
|  |  | 
|  | public static ReaderOptions readerOptions(Configuration conf) { | 
|  | return new ReaderOptions(conf); | 
|  | } | 
|  |  | 
|  | public static Reader createReader(Path path, | 
|  | ReaderOptions options) throws IOException { | 
|  | return new ReaderImpl(path, options); | 
|  | } | 
|  |  | 
|  | public interface WriterContext { | 
|  | Writer getWriter(); | 
|  | } | 
|  |  | 
|  | public interface WriterCallback { | 
|  | void preStripeWrite(WriterContext context) throws IOException; | 
|  | void preFooterWrite(WriterContext context) throws IOException; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Options for creating ORC file writers. | 
|  | */ | 
|  | public static class WriterOptions { | 
|  | private final Configuration configuration; | 
|  | private FileSystem fileSystemValue = null; | 
|  | private TypeDescription schema = null; | 
|  | private long stripeSizeValue; | 
|  | private long blockSizeValue; | 
|  | private int rowIndexStrideValue; | 
|  | private int bufferSizeValue; | 
|  | private boolean enforceBufferSize = false; | 
|  | private boolean blockPaddingValue; | 
|  | private CompressionKind compressValue; | 
|  | private MemoryManager memoryManagerValue; | 
|  | private Version versionValue; | 
|  | private WriterCallback callback; | 
|  | private EncodingStrategy encodingStrategy; | 
|  | private CompressionStrategy compressionStrategy; | 
|  | private double paddingTolerance; | 
|  | private String bloomFilterColumns; | 
|  | private double bloomFilterFpp; | 
|  |  | 
|  | protected WriterOptions(Properties tableProperties, Configuration conf) { | 
|  | configuration = conf; | 
|  | memoryManagerValue = getStaticMemoryManager(conf); | 
|  | stripeSizeValue = OrcConf.STRIPE_SIZE.getLong(tableProperties, conf); | 
|  | blockSizeValue = OrcConf.BLOCK_SIZE.getLong(tableProperties, conf); | 
|  | rowIndexStrideValue = | 
|  | (int) OrcConf.ROW_INDEX_STRIDE.getLong(tableProperties, conf); | 
|  | bufferSizeValue = (int) OrcConf.BUFFER_SIZE.getLong(tableProperties, | 
|  | conf); | 
|  | blockPaddingValue = | 
|  | OrcConf.BLOCK_PADDING.getBoolean(tableProperties, conf); | 
|  | compressValue = | 
|  | CompressionKind.valueOf(OrcConf.COMPRESS.getString(tableProperties, | 
|  | conf).toUpperCase()); | 
|  | String versionName = OrcConf.WRITE_FORMAT.getString(tableProperties, | 
|  | conf); | 
|  | versionValue = Version.byName(versionName); | 
|  | String enString = OrcConf.ENCODING_STRATEGY.getString(tableProperties, | 
|  | conf); | 
|  | encodingStrategy = EncodingStrategy.valueOf(enString); | 
|  |  | 
|  | String compString = | 
|  | OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf); | 
|  | compressionStrategy = CompressionStrategy.valueOf(compString); | 
|  |  | 
|  | paddingTolerance = | 
|  | OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf); | 
|  |  | 
|  | bloomFilterColumns = OrcConf.BLOOM_FILTER_COLUMNS.getString(tableProperties, | 
|  | conf); | 
|  | bloomFilterFpp = OrcConf.BLOOM_FILTER_FPP.getDouble(tableProperties, | 
|  | conf); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Provide the filesystem for the path, if the client has it available. | 
|  | * If it is not provided, it will be found from the path. | 
|  | */ | 
|  | public WriterOptions fileSystem(FileSystem value) { | 
|  | fileSystemValue = value; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Set the stripe size for the file. The writer stores the contents of the | 
|  | * stripe in memory until this memory limit is reached and the stripe | 
|  | * is flushed to the HDFS file and the next stripe started. | 
|  | */ | 
|  | public WriterOptions stripeSize(long value) { | 
|  | stripeSizeValue = value; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Set the file system block size for the file. For optimal performance, | 
|  | * set the block size to be multiple factors of stripe size. | 
|  | */ | 
|  | public WriterOptions blockSize(long value) { | 
|  | blockSizeValue = value; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Set the distance between entries in the row index. The minimum value is | 
|  | * 1000 to prevent the index from overwhelming the data. If the stride is | 
|  | * set to 0, no indexes will be included in the file. | 
|  | */ | 
|  | public WriterOptions rowIndexStride(int value) { | 
|  | rowIndexStrideValue = value; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * The size of the memory buffers used for compressing and storing the | 
|  | * stripe in memory. NOTE: ORC writer may choose to use smaller buffer | 
|  | * size based on stripe size and number of columns for efficient stripe | 
|  | * writing and memory utilization. To enforce writer to use the requested | 
|  | * buffer size use enforceBufferSize(). | 
|  | */ | 
|  | public WriterOptions bufferSize(int value) { | 
|  | bufferSizeValue = value; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Enforce writer to use requested buffer size instead of estimating | 
|  | * buffer size based on stripe size and number of columns. | 
|  | * See bufferSize() method for more info. | 
|  | * Default: false | 
|  | */ | 
|  | public WriterOptions enforceBufferSize() { | 
|  | enforceBufferSize = true; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Sets whether the HDFS blocks are padded to prevent stripes from | 
|  | * straddling blocks. Padding improves locality and thus the speed of | 
|  | * reading, but costs space. | 
|  | */ | 
|  | public WriterOptions blockPadding(boolean value) { | 
|  | blockPaddingValue = value; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Sets the encoding strategy that is used to encode the data. | 
|  | */ | 
|  | public WriterOptions encodingStrategy(EncodingStrategy strategy) { | 
|  | encodingStrategy = strategy; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Sets the tolerance for block padding as a percentage of stripe size. | 
|  | */ | 
|  | public WriterOptions paddingTolerance(double value) { | 
|  | paddingTolerance = value; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Comma separated values of column names for which bloom filter is to be created. | 
|  | */ | 
|  | public WriterOptions bloomFilterColumns(String columns) { | 
|  | bloomFilterColumns = columns; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Specify the false positive probability for bloom filter. | 
|  | * @param fpp - false positive probability | 
|  | * @return this | 
|  | */ | 
|  | public WriterOptions bloomFilterFpp(double fpp) { | 
|  | bloomFilterFpp = fpp; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Sets the generic compression that is used to compress the data. | 
|  | */ | 
|  | public WriterOptions compress(CompressionKind value) { | 
|  | compressValue = value; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Set the schema for the file. This is a required parameter. | 
|  | * @param schema the schema for the file. | 
|  | * @return this | 
|  | */ | 
|  | public WriterOptions setSchema(TypeDescription schema) { | 
|  | this.schema = schema; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Sets the version of the file that will be written. | 
|  | */ | 
|  | public WriterOptions version(Version value) { | 
|  | versionValue = value; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Add a listener for when the stripe and file are about to be closed. | 
|  | * @param callback the object to be called when the stripe is closed | 
|  | * @return this | 
|  | */ | 
|  | public WriterOptions callback(WriterCallback callback) { | 
|  | this.callback = callback; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * A package local option to set the memory manager. | 
|  | */ | 
|  | protected WriterOptions memory(MemoryManager value) { | 
|  | memoryManagerValue = value; | 
|  | return this; | 
|  | } | 
|  |  | 
|  | public boolean getBlockPadding() { | 
|  | return blockPaddingValue; | 
|  | } | 
|  |  | 
|  | public long getBlockSize() { | 
|  | return blockSizeValue; | 
|  | } | 
|  |  | 
|  | public String getBloomFilterColumns() { | 
|  | return bloomFilterColumns; | 
|  | } | 
|  |  | 
|  | public FileSystem getFileSystem() { | 
|  | return fileSystemValue; | 
|  | } | 
|  |  | 
|  | public Configuration getConfiguration() { | 
|  | return configuration; | 
|  | } | 
|  |  | 
|  | public TypeDescription getSchema() { | 
|  | return schema; | 
|  | } | 
|  |  | 
|  | public long getStripeSize() { | 
|  | return stripeSizeValue; | 
|  | } | 
|  |  | 
|  | public CompressionKind getCompress() { | 
|  | return compressValue; | 
|  | } | 
|  |  | 
|  | public WriterCallback getCallback() { | 
|  | return callback; | 
|  | } | 
|  |  | 
|  | public Version getVersion() { | 
|  | return versionValue; | 
|  | } | 
|  |  | 
|  | public MemoryManager getMemoryManager() { | 
|  | return memoryManagerValue; | 
|  | } | 
|  |  | 
|  | public int getBufferSize() { | 
|  | return bufferSizeValue; | 
|  | } | 
|  |  | 
|  | public boolean isEnforceBufferSize() { | 
|  | return enforceBufferSize; | 
|  | } | 
|  |  | 
|  | public int getRowIndexStride() { | 
|  | return rowIndexStrideValue; | 
|  | } | 
|  |  | 
|  | public CompressionStrategy getCompressionStrategy() { | 
|  | return compressionStrategy; | 
|  | } | 
|  |  | 
|  | public EncodingStrategy getEncodingStrategy() { | 
|  | return encodingStrategy; | 
|  | } | 
|  |  | 
|  | public double getPaddingTolerance() { | 
|  | return paddingTolerance; | 
|  | } | 
|  |  | 
|  | public double getBloomFilterFpp() { | 
|  | return bloomFilterFpp; | 
|  | } | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Create a set of writer options based on a configuration. | 
|  | * @param conf the configuration to use for values | 
|  | * @return A WriterOptions object that can be modified | 
|  | */ | 
|  | public static WriterOptions writerOptions(Configuration conf) { | 
|  | return new WriterOptions(null, conf); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Create a set of write options based on a set of table properties and | 
|  | * configuration. | 
|  | * @param tableProperties the properties of the table | 
|  | * @param conf the configuration of the query | 
|  | * @return a WriterOptions object that can be modified | 
|  | */ | 
|  | public static WriterOptions writerOptions(Properties tableProperties, | 
|  | Configuration conf) { | 
|  | return new WriterOptions(tableProperties, conf); | 
|  | } | 
|  |  | 
|  | private static ThreadLocal<MemoryManager> memoryManager = null; | 
|  |  | 
|  | private static synchronized MemoryManager getStaticMemoryManager( | 
|  | final Configuration conf) { | 
|  | if (memoryManager == null) { | 
|  | memoryManager = new ThreadLocal<MemoryManager>() { | 
|  | @Override | 
|  | protected MemoryManager initialValue() { | 
|  | return new MemoryManager(conf); | 
|  | } | 
|  | }; | 
|  | } | 
|  | return memoryManager.get(); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Create an ORC file writer. This is the public interface for creating | 
|  | * writers going forward and new options will only be added to this method. | 
|  | * @param path filename to write to | 
|  | * @param opts the options | 
|  | * @return a new ORC file writer | 
|  | * @throws IOException | 
|  | */ | 
|  | public static Writer createWriter(Path path, | 
|  | WriterOptions opts | 
|  | ) throws IOException { | 
|  | FileSystem fs = opts.getFileSystem() == null ? | 
|  | path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem(); | 
|  |  | 
|  | return new WriterImpl(fs, path, opts); | 
|  | } | 
|  |  | 
|  | } |