blob: 4823e1b18a7235f8ef0d37aa76c800cdd870139d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef avro_Stream_hh__
#define avro_Stream_hh__
#include <memory>
#include <string.h>
#include <stdint.h>
#include "boost/utility.hpp"
#include "Config.hh"
#include "Exception.hh"
namespace avro {
/**
* A no-copy input stream.
*/
class AVRO_DECL InputStream : boost::noncopyable {
protected:
/**
* An empty constuctor.
*/
InputStream() { }
public:
/**
* Destructor.
*/
virtual ~InputStream() { }
/**
* Returns some of available data.
*
* Returns true if some data is available, false if no more data is
* available or an error has occurred.
*/
virtual bool next(const uint8_t** data, size_t* len) = 0;
/**
* "Returns" back some of the data to the stream. The returned
* data must be less than what was obtained in the last call to
* next().
*/
virtual void backup(size_t len) = 0;
/**
* Skips number of bytes specified by len.
*/
virtual void skip(size_t len) = 0;
/**
* Returns the number of bytes read from this stream so far.
* All the bytes made available through next are considered
* to be used unless, retutned back using backup.
*/
virtual size_t byteCount() const = 0;
};
typedef std::unique_ptr<InputStream> InputStreamPtr;
/**
* An InputStream which also supports seeking to a specific offset.
*/
class AVRO_DECL SeekableInputStream : public InputStream {
protected:
/**
* An empty constuctor.
*/
SeekableInputStream() { }
public:
/**
* Destructor.
*/
virtual ~SeekableInputStream() { }
/**
* Seek to a specific position in the stream. This may invalidate pointers
* returned from next(). This will also reset byteCount() to the given
* position.
*/
virtual void seek(int64_t position) = 0;
};
typedef std::unique_ptr<SeekableInputStream> SeekableInputStreamPtr;
/**
* A no-copy output stream.
*/
class AVRO_DECL OutputStream : boost::noncopyable {
protected:
/**
* An empty constuctor.
*/
OutputStream() { }
public:
/**
* Destructor.
*/
virtual ~OutputStream() { }
/**
* Returns a buffer that can be written into.
* On successful return, data has the pointer to the buffer
* and len has the number of bytes available at data.
*/
virtual bool next(uint8_t** data, size_t* len) = 0;
/**
* "Returns" back to the stream some of the buffer obtained
* from in the last call to next().
*/
virtual void backup(size_t len) = 0;
/**
* Number of bytes written so far into this stream. The whole buffer
* returned by next() is assumed to be written unless some of
* it was retutned using backup().
*/
virtual uint64_t byteCount() const = 0;
/**
* Flushes any data remaining in the buffer to the stream's underlying
* store, if any.
*/
virtual void flush() = 0;
};
typedef std::unique_ptr<OutputStream> OutputStreamPtr;
/**
* Returns a new OutputStream, which grows in memory chunks of specified size.
*/
AVRO_DECL OutputStreamPtr memoryOutputStream(size_t chunkSize = 4 * 1024);
/**
* Returns a new InputStream, with the data from the given byte array.
* It does not copy the data, the byte array should remain valid
* until the InputStream is used.
*/
AVRO_DECL InputStreamPtr memoryInputStream(const uint8_t* data, size_t len);
/**
* Returns a new InputStream with the contents written into an
* outputstream. The output stream must have been returned by
* an earlier call to memoryOutputStream(). The contents for the new
* input stream are the snapshot of the outputstream. One can construct
* any number of memory input stream from a single memory output stream.
*/
AVRO_DECL InputStreamPtr memoryInputStream(const OutputStream& source);
/**
* Returns the contents written so far into the output stream, which should
* be a memory output stream. That is it must have been returned by a pervious
* call to memoryOutputStream().
*/
AVRO_DECL std::shared_ptr<std::vector<uint8_t> > snapshot(const OutputStream& source);
/**
* Returns a new OutputStream whose contents would be stored in a file.
* Data is written in chunks of given buffer size.
*
* If there is a file with the given name, it is truncated and overwritten.
* If there is no file with the given name, it is created.
*/
AVRO_DECL OutputStreamPtr fileOutputStream(const char* filename,
size_t bufferSize = 8 * 1024);
/**
* Returns a new InputStream whose contents come from the given file.
* Data is read in chunks of given buffer size.
*/
AVRO_DECL InputStreamPtr fileInputStream(
const char *filename, size_t bufferSize = 8 * 1024);
AVRO_DECL SeekableInputStreamPtr fileSeekableInputStream(
const char *filename, size_t bufferSize = 8 * 1024);
/**
* Returns a new OutputStream whose contents will be sent to the given
* std::ostream. The std::ostream object should outlive the returned
* OutputStream.
*/
AVRO_DECL OutputStreamPtr ostreamOutputStream(std::ostream& os,
size_t bufferSize = 8 * 1024);
/**
* Returns a new InputStream whose contents come from the given
* std::istream. The std::istream object should outlive the returned
* InputStream.
*/
AVRO_DECL InputStreamPtr istreamInputStream(
std::istream &in, size_t bufferSize = 8 * 1024);
/**
* Returns a new InputStream whose contents come from the given
* std::istream. Use this instead of istreamInputStream if
* the istream does not support seekg (e.g. compressed streams).
* The returned InputStream would read off bytes instead of seeking.
* Of, course it has a performance penalty when reading instead of seeking;
* So, use this only when seekg does not work.
* The std::istream object should outlive the returned
* InputStream.
*/
AVRO_DECL InputStreamPtr nonSeekableIstreamInputStream(
std::istream& is, size_t bufferSize = 8 * 1024);
/** A convenience class for reading from an InputStream */
struct StreamReader {
/**
* The underlying input stream.
*/
InputStream* in_;
/**
* The next location to read from.
*/
const uint8_t* next_;
/**
* One past the last valid location.
*/
const uint8_t* end_;
/**
* Constructs an empty reader.
*/
StreamReader() : in_(0), next_(0), end_(0) { }
/**
* Constructs a reader with the given underlying stream.
*/
StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
/**
* Replaces the current input stream with the given one after backing up
* the original one if required.
*/
void reset(InputStream& is) {
if (in_ != 0 && end_ != next_) {
in_->backup(end_ - next_);
}
in_ = &is;
next_ = end_ = 0;
}
/**
* Read just one byte from the underlying stream. If there are no
* more data, throws an exception.
*/
uint8_t read() {
if (next_ == end_) {
more();
}
return *next_++;
}
/**
* Reads the given number of bytes from the underlying stream.
* If there are not that many bytes, throws an exception.
*/
void readBytes(uint8_t* b, size_t n) {
while (n > 0) {
if (next_ == end_) {
more();
}
size_t q = end_ - next_;
if (q > n) {
q = n;
}
::memcpy(b, next_, q);
next_ += q;
b += q;
n -= q;
}
}
/**
* Skips the given number of bytes. Of there are not so that many
* bytes, throws an exception.
*/
void skipBytes(size_t n) {
if (n > static_cast<size_t>(end_ - next_)) {
n -= end_ - next_;
next_ = end_;
in_->skip(n);
} else {
next_ += n;
}
}
/**
* Get as many byes from the underlying stream as possible in a single
* chunk.
* \return true if some data could be obtained. False is no more
* data is available on the stream.
*/
bool fill() {
size_t n = 0;
while (in_->next(&next_, &n)) {
if (n != 0) {
end_ = next_ + n;
return true;
}
}
return false;
}
/**
* Tries to get more data and if it cannot, throws an exception.
*/
void more() {
if (! fill()) {
throw Exception("EOF reached");
}
}
/**
* Returns true if and only if the end of stream is not reached.
*/
bool hasMore() {
return (next_ == end_) ? fill() : true;
}
/**
* Returns unused bytes back to the underlying stream.
* If unRead is true the last byte read is also pushed back.
*/
void drain(bool unRead) {
if (unRead) {
--next_;
}
in_->backup(end_ - next_);
end_ = next_;
}
};
/**
* A convinience class to write data into an OutputStream.
*/
struct StreamWriter {
/**
* The underlying output stream for this writer.
*/
OutputStream* out_;
/**
* The next location to write to.
*/
uint8_t* next_;
/**
* One past the last location one can write to.
*/
uint8_t* end_;
/**
* Constructs a writer with no underlying stream.
*/
StreamWriter() : out_(0), next_(0), end_(0) { }
/**
* Constructs a new writer with the given underlying stream.
*/
StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
/**
* Replaces the current underlying stream with a new one.
* If required, it backs up unused bytes in the previous stream.
*/
void reset(OutputStream& os) {
if (out_ != 0 && end_ != next_) {
out_->backup(end_ - next_);
}
out_ = &os;
next_ = end_;
}
/**
* Writes a single byte.
*/
void write(uint8_t c) {
if (next_ == end_) {
more();
}
*next_++ = c;
}
/**
* Writes the specified number of bytes starting at \p b.
*/
void writeBytes(const uint8_t* b, size_t n) {
while (n > 0) {
if (next_ == end_) {
more();
}
size_t q = end_ - next_;
if (q > n) {
q = n;
}
::memcpy(next_, b, q);
next_ += q;
b += q;
n -= q;
}
}
/**
* backs up upto the currently written data and flushes the
* underlying stream.
*/
void flush() {
if (next_ != end_) {
out_->backup(end_ - next_);
next_ = end_;
}
out_->flush();
}
/**
* Return the number of bytes written so far. For a meaningful
* result, call this after a flush().
*/
int64_t byteCount() const {
return out_->byteCount();
}
/**
* Gets more space to write to. Throws an exception it cannot.
*/
void more() {
size_t n = 0;
while (out_->next(&next_, &n)) {
if (n != 0) {
end_ = next_ + n;
return;
}
}
throw Exception("EOF reached");
}
};
/**
* A convenience function to copy all the contents of an input stream into
* an output stream.
*/
inline void copy(InputStream& in, OutputStream& out)
{
const uint8_t *p = 0;
size_t n = 0;
StreamWriter w(out);
while (in.next(&p, &n)) {
w.writeBytes(p, n);
}
w.flush();
}
} // namespace avro
#endif