blob: 48a501d5f4a7193e868e026cd6d71b4cba948dde [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// Classes used internally by pb_util.h.
// This header should not be included by anything but pb_util and its tests.
#ifndef KUDU_UTIL_PB_UTIL_INTERNAL_H
#define KUDU_UTIL_PB_UTIL_INTERNAL_H
#include <cstddef>
#include <cstdint>
#include <memory>
#include <glog/logging.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/port.h"
#include "kudu/util/env.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
namespace kudu {
namespace pb_util {
namespace internal {
// Input Stream used by ParseFromSequentialFile()
class SequentialFileFileInputStream : public google::protobuf::io::ZeroCopyInputStream {
public:
explicit SequentialFileFileInputStream(SequentialFile *rfile,
size_t buffer_size = kDefaultBufferSize)
: buffer_used_(0), buffer_offset_(0),
buffer_size_(buffer_size), buffer_(new uint8[buffer_size_]),
total_read_(0), rfile_(rfile) {
CHECK_GT(buffer_size, 0);
}
~SequentialFileFileInputStream() {
}
bool Next(const void **data, int *size) OVERRIDE;
bool Skip(int count) OVERRIDE;
void BackUp(int count) OVERRIDE {
CHECK_GE(count, 0);
CHECK_LE(count, buffer_offset_);
buffer_offset_ -= count;
total_read_ -= count;
}
int64_t ByteCount() const OVERRIDE {
return total_read_;
}
Status status() const {
return status_;
}
private:
static const size_t kDefaultBufferSize = 8192;
Status status_;
size_t buffer_used_;
size_t buffer_offset_;
const size_t buffer_size_;
std::unique_ptr<uint8_t[]> buffer_;
size_t total_read_;
SequentialFile *rfile_;
};
// Output Stream used by SerializeToWritableFile()
class WritableFileOutputStream : public google::protobuf::io::ZeroCopyOutputStream {
public:
explicit WritableFileOutputStream(WritableFile *wfile, size_t buffer_size = kDefaultBufferSize)
: buffer_offset_(0), buffer_size_(buffer_size), buffer_(new uint8[buffer_size_]),
flushed_(0), wfile_(wfile) {
CHECK_GT(buffer_size, 0);
}
~WritableFileOutputStream() {
}
bool Flush() {
if (buffer_offset_ > 0) {
Slice data(buffer_.get(), buffer_offset_);
status_ = wfile_->Append(data);
flushed_ += buffer_offset_;
buffer_offset_ = 0;
}
return status_.ok();
}
bool Next(void **data, int *size) OVERRIDE;
void BackUp(int count) OVERRIDE {
CHECK_GE(count, 0);
CHECK_LE(count, buffer_offset_);
buffer_offset_ -= count;
}
int64_t ByteCount() const OVERRIDE {
return flushed_ + buffer_offset_;
}
private:
static const size_t kDefaultBufferSize = 8192;
Status status_;
size_t buffer_offset_;
const size_t buffer_size_;
std::unique_ptr<uint8_t[]> buffer_;
size_t flushed_;
WritableFile *wfile_;
};
} // namespace internal
} // namespace pb_util
} // namespace kudu
#endif