blob: 5c34e427ded93e3e9568a056238773bf65c81e64 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <fstream>
#include <functional>
#include <limits>
#include <memory>
#include "runtime/mem-tracker.h"
#include "runtime/outbound-row-batch.h"
#include "util/runtime-profile.h"
#include "common/status.h"
namespace kudu {
class WritableFile;
}
namespace impala {
class RowBatch;
class RuntimeState;
class TupleReadWriteTest;
using RequestWriteSizeCb = std::function<Status (size_t)>;
/// The TupleFileWriter is used to serialize a stream of RowBatches to a local file
/// for the tuple cache. It uses the standard RowBatch serialization used for KRPC
/// data streams (i.e. RowBatch::Serialize()). The files can be read back using the
/// TupleFileReader.
///
/// The TupleFileWriter writes in a temporary location and then moves the file
/// into its final location when Commit() is called. Commit() is the only way that a
/// file will persist over time. If the TupleFileWriter is destructed without calling
/// Commit(), it runs Abort() and any associated file is deleted. The user can
/// proactively call Abort() to delete any associated files, but it is not required.
///
/// The TupleFileWriter calls the request_write_size_cb requesting a new write size
/// before each write and will fail Write() if this callback returns an error.
/// It provides a way for the caller to get how many bytes have been written for
/// accounting purposes.
///
/// Currently, the TupleFileWriter does not embed the actual tuple layout into the
/// file. It relies on the corresponding TupleFileReader reading with the same
/// tuple layout. This will be modified later to embed a representation of the tuple
/// layout into the file.
class TupleFileWriter {
public:
TupleFileWriter(std::string path, MemTracker* parent, RuntimeProfile* profile,
RequestWriteSizeCb request_write_size_cb = nullptr);
~TupleFileWriter();
Status Open(RuntimeState* state);
// Writes a row batch to file. This holds no references to memory from the RowBatch.
// If Write() returns a non-OK Status, it is not recoverable and the caller should not
// call Write() or Commit().
Status Write(RuntimeState* state, RowBatch* row_batch);
// Number of bytes written to file. Must be called before Commit/Abort.
size_t BytesWritten() const;
// Stop writing and delete any written data.
void Abort();
// Ensure data is available for future reads.
Status Commit(RuntimeState* state);
protected:
friend class TupleFileReadWriteTest;
std::string TempPath() const;
private:
// Destination path
std::string path_;
// Suffix for temporary filename during writing.
std::string temp_suffix_;
// MemTracker for OutboundRowBatches.
std::shared_ptr<MemTracker> tracker_;
// Allocator (using tracker_) for OutboundRowBatches.
std::unique_ptr<CharMemTrackerAllocator> allocator_;
// Buffer used for serializing row batches.
std::unique_ptr<OutboundRowBatch> out_batch_;
// Total write time by the writer.
RuntimeProfile::Counter* write_timer_;
// Total time spent on serialization.
RuntimeProfile::Counter* serialize_timer_;
// Total bytes written
RuntimeProfile::Counter* bytes_written_;
// Callback to request an increase to the write size
RequestWriteSizeCb request_write_size_cb_;
// This writes to a temporary file, only moving it into the final location with
// Commit(). tmp_file_ is the file abstraction used for writing the temporary file.
std::unique_ptr<kudu::WritableFile> tmp_file_;
// The writer starts as UNINIT. Once it transitions to IN_PROGRESS in Open(), it will
// be ABORTED unless the caller runs Commit().
enum class State {
Uninitialized,
InProgress,
Committed,
Aborted
};
friend
std::ostream& operator<<(std::ostream& out, const TupleFileWriter::State& state);
State state_ = State::Uninitialized;
};
}