blob: d73b18ba98f42f285f00eca8f88b0b59f30a375b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef avro_DataFile_hh__
#define avro_DataFile_hh__
#include "Encoder.hh"
#include "buffer/Buffer.hh"
#include "ValidSchema.hh"
#include "Specific.hh"
#include "Stream.hh"
#include <map>
#include <string>
#include <vector>
#include "boost/array.hpp"
#include "boost/utility.hpp"
namespace avro {
typedef boost::array<uint8_t, 16> DataFileSync;
/**
* Type-independent portion of DataFileWriter.
* At any given point in time, at most one file can be written using
* this object.
*/
class DataFileWriterBase : boost::noncopyable {
const std::string filename_;
const ValidSchema schema_;
const EncoderPtr encoderPtr_;
const size_t syncInterval_;
std::auto_ptr<OutputStream> stream_;
std::auto_ptr<OutputStream> buffer_;
const DataFileSync sync_;
int64_t objectCount_;
typedef std::map<std::string, std::vector<uint8_t> > Metadata;
Metadata metadata_;
static std::auto_ptr<OutputStream> makeStream(const char* filename);
static DataFileSync makeSync();
void writeHeader();
void setMetadata(const std::string& key, const std::string& value);
/**
* Generates a sync marker in the file.
*/
void sync();
public:
Encoder& encoder() const { return *encoderPtr_; }
void syncIfNeeded();
void incr() {
++objectCount_;
}
/**
* Constructs a data file writer with the given sync interval and name.
*/
DataFileWriterBase(const char* filename, const ValidSchema& schema,
size_t syncInterval);
~DataFileWriterBase();
/**
* Closes the current file. Once closed this datafile object cannot be
* used for writing any more.
*/
void close();
/**
* Returns the schema for this data file.
*/
const ValidSchema& schema() const { return schema_; }
/**
* Flushes any unwritten data into the file.
*/
void flush();
};
/**
* An Avro datafile that can store objects of type T.
*/
template <typename T>
class DataFileWriter : boost::noncopyable {
std::auto_ptr<DataFileWriterBase> base_;
public:
/**
* Constructs a new data file.
*/
DataFileWriter(const char* filename, const ValidSchema& schema,
size_t syncInterval = 16 * 1024) :
base_(new DataFileWriterBase(filename, schema, syncInterval)) { }
/**
* Writes the given piece of data into the file.
*/
void write(const T& datum) {
base_->syncIfNeeded();
avro::encode(base_->encoder(), datum);
base_->incr();
}
/**
* Closes the current file. Once closed this datafile object cannot be
* used for writing any more.
*/
void close() { base_->close(); }
/**
* Returns the schema for this data file.
*/
const ValidSchema& schema() const { return base_->schema(); }
/**
* Flushes any unwritten data into the file.
*/
void flush() { base_->flush(); }
};
class DataFileReaderBase : boost::noncopyable {
const std::string filename_;
const std::auto_ptr<InputStream> stream_;
const DecoderPtr decoder_;
int64_t objectCount_;
ValidSchema readerSchema_;
ValidSchema dataSchema_;
DecoderPtr dataDecoder_;
std::auto_ptr<InputStream> dataStream_;
typedef std::map<std::string, std::vector<uint8_t> > Metadata;
Metadata metadata_;
DataFileSync sync_;
void readHeader();
bool readDataBlock();
public:
Decoder& decoder() { return *dataDecoder_; }
/**
* Returns true if and only if there is more to read.
*/
bool hasMore();
void decr() { --objectCount_; }
/**
* Constructs the reader for the given file and the reader is
* expected to use the schema that is used with data.
* This function should be called exactly once after constructing
* the DataFileReaderBase object.
*/
DataFileReaderBase(const char* filename);
/**
* Initializes the reader so that the reader and writer schemas
* are the same.
*/
void init();
/**
* Initializes the reader to read objects according to the given
* schema. This gives an opportinity for the reader to see the schema
* in the data file before deciding the right schema to use for reading.
* This must be called exactly once after constructing the
* DataFileReaderBase object.
*/
void init(const ValidSchema& readerSchema);
/**
* Returns the schema for this object.
*/
const ValidSchema& readerSchema() { return readerSchema_; }
/**
* Returns the schema stored with the data file.
*/
const ValidSchema& dataSchema() { return dataSchema_; }
/**
* Closes the reader. No further operation is possible on this reader.
*/
void close();
};
template <typename T>
class DataFileReader : boost::noncopyable {
std::auto_ptr<DataFileReaderBase> base_;
public:
/**
* Constructs the reader for the given file and the reader is
* expected to use the given schema.
*/
DataFileReader(const char* filename, const ValidSchema& readerSchema) :
base_(new DataFileReaderBase(filename)) {
base_->init(readerSchema);
}
/**
* Constructs the reader for the given file and the reader is
* expected to use the schema that is used with data.
*/
DataFileReader(const char* filename) :
base_(new DataFileReaderBase(filename)) {
base_->init();
}
/**
* Constructs a reader using the reader base. This form of constructor
* allows the user to examine the schema of a given file and then
* decide to use the right type of data to be desrialize. Without this
* the user must know the type of data for the template _before_
* he knows the schema within the file.
* The schema present in the data file will be used for reading
* from this reader.
*/
DataFileReader(std::auto_ptr<DataFileReaderBase> base) : base_(base) {
base_->init();
}
/**
* Constructs a reader using the reader base. This form of constructor
* allows the user to examine the schema of a given file and then
* decide to use the right type of data to be desrialize. Without this
* the user must know the type of data for the template _before_
* he knows the schema within the file.
* The argument readerSchema will be used for reading
* from this reader.
*/
DataFileReader(std::auto_ptr<DataFileReaderBase> base,
const ValidSchema& readerSchema) : base_(base) {
base_->init(readerSchema);
}
bool read(T& datum) {
if (base_->hasMore()) {
base_->decr();
avro::decode(base_->decoder(), datum);
return true;
}
return false;
}
/**
* Returns the schema for this object.
*/
const ValidSchema& readerSchema() { return base_->readerSchema(); }
/**
* Returns the schema stored with the data file.
*/
const ValidSchema& dataSchema() { return base_->dataSchema(); }
/**
* Closes the reader. No further operation is possible on this reader.
*/
void close() { return base_->close(); }
};
} // namespace avro
#endif