| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #ifndef avro_DataFile_hh__ |
| #define avro_DataFile_hh__ |
| |
| #include "Config.hh" |
| #include "Encoder.hh" |
| #include "buffer/Buffer.hh" |
| #include "ValidSchema.hh" |
| #include "Specific.hh" |
| #include "Stream.hh" |
| |
| #include <map> |
| #include <string> |
| #include <vector> |
| |
| #include "array" |
| #include "boost/utility.hpp" |
| #include <boost/iostreams/filtering_stream.hpp> |
| |
| namespace avro { |
| |
| /** Specify type of compression to use when writing data files. */ |
| enum Codec { |
| NULL_CODEC, |
| DEFLATE_CODEC, |
| |
| #ifdef SNAPPY_CODEC_AVAILABLE |
| SNAPPY_CODEC |
| #endif |
| |
| }; |
| |
| const int SyncSize = 16; |
| /** |
| * The sync value. |
| */ |
| typedef std::array<uint8_t, SyncSize> DataFileSync; |
| |
| /** |
| * Type-independent portion of DataFileWriter. |
| * At any given point in time, at most one file can be written using |
| * this object. |
| */ |
| class AVRO_DECL DataFileWriterBase : boost::noncopyable { |
| const std::string filename_; |
| const ValidSchema schema_; |
| const EncoderPtr encoderPtr_; |
| const size_t syncInterval_; |
| Codec codec_; |
| |
| std::unique_ptr<OutputStream> stream_; |
| std::unique_ptr<OutputStream> buffer_; |
| const DataFileSync sync_; |
| int64_t objectCount_; |
| |
| typedef std::map<std::string, std::vector<uint8_t> > Metadata; |
| |
| Metadata metadata_; |
| |
| static std::unique_ptr<OutputStream> makeStream(const char* filename); |
| static DataFileSync makeSync(); |
| |
| void writeHeader(); |
| void setMetadata(const std::string& key, const std::string& value); |
| |
| /** |
| * Generates a sync marker in the file. |
| */ |
| void sync(); |
| |
| /** |
| * Shared constructor portion since we aren't using C++11 |
| */ |
| void init(const ValidSchema &schema, size_t syncInterval, const Codec &codec); |
| |
| public: |
| /** |
| * Returns the current encoder for this writer. |
| */ |
| Encoder& encoder() const { return *encoderPtr_; } |
| |
| /** |
| * Returns true if the buffer has sufficient data for a sync to be |
| * inserted. |
| */ |
| void syncIfNeeded(); |
| |
| /** |
| * Increments the object count. |
| */ |
| void incr() { |
| ++objectCount_; |
| } |
| /** |
| * Constructs a data file writer with the given sync interval and name. |
| */ |
| DataFileWriterBase(const char* filename, const ValidSchema& schema, |
| size_t syncInterval, Codec codec = NULL_CODEC); |
| DataFileWriterBase(std::unique_ptr<OutputStream> outputStream, |
| const ValidSchema& schema, size_t syncInterval, Codec codec); |
| |
| ~DataFileWriterBase(); |
| /** |
| * Closes the current file. Once closed this datafile object cannot be |
| * used for writing any more. |
| */ |
| void close(); |
| |
| /** |
| * Returns the schema for this data file. |
| */ |
| const ValidSchema& schema() const { return schema_; } |
| |
| /** |
| * Flushes any unwritten data into the file. |
| */ |
| void flush(); |
| }; |
| |
| /** |
| * An Avro datafile that can store objects of type T. |
| */ |
| template <typename T> |
| class DataFileWriter : boost::noncopyable { |
| std::unique_ptr<DataFileWriterBase> base_; |
| public: |
| /** |
| * Constructs a new data file. |
| */ |
| DataFileWriter(const char* filename, const ValidSchema& schema, |
| size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : |
| base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) { } |
| |
| DataFileWriter(std::unique_ptr<OutputStream> outputStream, const ValidSchema& schema, |
| size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : |
| base_(new DataFileWriterBase(std::move(outputStream), schema, syncInterval, codec)) { } |
| |
| /** |
| * Writes the given piece of data into the file. |
| */ |
| void write(const T& datum) { |
| base_->syncIfNeeded(); |
| avro::encode(base_->encoder(), datum); |
| base_->incr(); |
| } |
| |
| /** |
| * Closes the current file. Once closed this datafile object cannot be |
| * used for writing any more. |
| */ |
| void close() { base_->close(); } |
| |
| /** |
| * Returns the schema for this data file. |
| */ |
| const ValidSchema& schema() const { return base_->schema(); } |
| |
| /** |
| * Flushes any unwritten data into the file. |
| */ |
| void flush() { base_->flush(); } |
| }; |
| |
| /** |
| * The type independent portion of rader. |
| */ |
| class AVRO_DECL DataFileReaderBase : boost::noncopyable { |
| const std::string filename_; |
| const std::unique_ptr<InputStream> stream_; |
| const DecoderPtr decoder_; |
| int64_t objectCount_; |
| bool eof_; |
| Codec codec_; |
| int64_t blockStart_; |
| int64_t blockEnd_; |
| |
| ValidSchema readerSchema_; |
| ValidSchema dataSchema_; |
| DecoderPtr dataDecoder_; |
| std::unique_ptr<InputStream> dataStream_; |
| typedef std::map<std::string, std::vector<uint8_t> > Metadata; |
| |
| Metadata metadata_; |
| DataFileSync sync_; |
| |
| // for compressed buffer |
| std::unique_ptr<boost::iostreams::filtering_istream> os_; |
| std::vector<char> compressed_; |
| std::string uncompressed; |
| void readHeader(); |
| |
| void readDataBlock(); |
| void doSeek(int64_t position); |
| public: |
| /** |
| * Returns the current decoder for this reader. |
| */ |
| Decoder& decoder() { return *dataDecoder_; } |
| |
| /** |
| * Returns true if and only if there is more to read. |
| */ |
| bool hasMore(); |
| |
| /** |
| * Decrements the number of objects yet to read. |
| */ |
| void decr() { --objectCount_; } |
| |
| /** |
| * Constructs the reader for the given file and the reader is |
| * expected to use the schema that is used with data. |
| * This function should be called exactly once after constructing |
| * the DataFileReaderBase object. |
| */ |
| DataFileReaderBase(const char* filename); |
| |
| DataFileReaderBase(std::unique_ptr<InputStream> inputStream); |
| |
| /** |
| * Initializes the reader so that the reader and writer schemas |
| * are the same. |
| */ |
| void init(); |
| |
| /** |
| * Initializes the reader to read objects according to the given |
| * schema. This gives an opportinity for the reader to see the schema |
| * in the data file before deciding the right schema to use for reading. |
| * This must be called exactly once after constructing the |
| * DataFileReaderBase object. |
| */ |
| void init(const ValidSchema& readerSchema); |
| |
| /** |
| * Returns the schema for this object. |
| */ |
| const ValidSchema& readerSchema() { return readerSchema_; } |
| |
| /** |
| * Returns the schema stored with the data file. |
| */ |
| const ValidSchema& dataSchema() { return dataSchema_; } |
| |
| /** |
| * Closes the reader. No further operation is possible on this reader. |
| */ |
| void close(); |
| |
| /** |
| * Move to a specific, known synchronization point, for example one returned |
| * from tell() after sync(). |
| */ |
| void seek(int64_t position); |
| |
| /** |
| * Move to the next synchronization point after a position. To process a |
| * range of file entires, call this with the starting position, then check |
| * pastSync() with the end point before each use of decoder(). |
| */ |
| void sync(int64_t position); |
| |
| /** |
| * Return true if past the next synchronization point after a position. |
| */ |
| bool pastSync(int64_t position); |
| |
| /** |
| * Return the last synchronization point before our current position. |
| */ |
| int64_t previousSync(); |
| }; |
| |
| /** |
| * Reads the contents of data file one after another. |
| */ |
| template <typename T> |
| class DataFileReader : boost::noncopyable { |
| std::unique_ptr<DataFileReaderBase> base_; |
| public: |
| /** |
| * Constructs the reader for the given file and the reader is |
| * expected to use the given schema. |
| */ |
| DataFileReader(const char* filename, const ValidSchema& readerSchema) : |
| base_(new DataFileReaderBase(filename)) { |
| base_->init(readerSchema); |
| } |
| |
| DataFileReader(std::unique_ptr<InputStream> inputStream, const ValidSchema& readerSchema) : |
| base_(new DataFileReaderBase(std::move(inputStream))) { |
| base_->init(readerSchema); |
| } |
| |
| /** |
| * Constructs the reader for the given file and the reader is |
| * expected to use the schema that is used with data. |
| */ |
| DataFileReader(const char* filename) : |
| base_(new DataFileReaderBase(filename)) { |
| base_->init(); |
| } |
| |
| DataFileReader(std::unique_ptr<InputStream> inputStream) : |
| base_(new DataFileReaderBase(std::move(inputStream))) { |
| base_->init(); |
| } |
| |
| /** |
| * Constructs a reader using the reader base. This form of constructor |
| * allows the user to examine the schema of a given file and then |
| * decide to use the right type of data to be desrialize. Without this |
| * the user must know the type of data for the template _before_ |
| * he knows the schema within the file. |
| * The schema present in the data file will be used for reading |
| * from this reader. |
| */ |
| DataFileReader(std::unique_ptr<DataFileReaderBase> base) : base_(std::move(base)) { |
| base_->init(); |
| } |
| |
| /** |
| * Constructs a reader using the reader base. This form of constructor |
| * allows the user to examine the schema of a given file and then |
| * decide to use the right type of data to be desrialize. Without this |
| * the user must know the type of data for the template _before_ |
| * he knows the schema within the file. |
| * The argument readerSchema will be used for reading |
| * from this reader. |
| */ |
| DataFileReader(std::unique_ptr<DataFileReaderBase> base, |
| const ValidSchema& readerSchema) : base_(std::move(base)) { |
| base_->init(readerSchema); |
| } |
| |
| /** |
| * Reads the next entry from the data file. |
| * \return true if an object has been successfully read into \p datum and |
| * false if there are no more entries in the file. |
| */ |
| bool read(T& datum) { |
| if (base_->hasMore()) { |
| base_->decr(); |
| avro::decode(base_->decoder(), datum); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Returns the schema for this object. |
| */ |
| const ValidSchema& readerSchema() { return base_->readerSchema(); } |
| |
| /** |
| * Returns the schema stored with the data file. |
| */ |
| const ValidSchema& dataSchema() { return base_->dataSchema(); } |
| |
| /** |
| * Closes the reader. No further operation is possible on this reader. |
| */ |
| void close() { return base_->close(); } |
| |
| /** |
| * Move to a specific, known synchronization point, for example one returned |
| * from previousSync(). |
| */ |
| void seek(int64_t position) { base_->seek(position); } |
| |
| /** |
| * Move to the next synchronization point after a position. To process a |
| * range of file entires, call this with the starting position, then check |
| * pastSync() with the end point before each call to read(). |
| */ |
| void sync(int64_t position) { base_->sync(position); } |
| |
| /** |
| * Return true if past the next synchronization point after a position. |
| */ |
| bool pastSync(int64_t position) { return base_->pastSync(position); } |
| |
| /** |
| * Return the last synchronization point before our current position. |
| */ |
| int64_t previousSync() { return base_->previousSync(); } |
| }; |
| |
| } // namespace avro |
| #endif |