blob: c98816bb0eb015e5623996b4ed74c95ba7b9ea23 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_RUNTIME_COLLECTION_VALUE_BUILDER_H
#define IMPALA_RUNTIME_COLLECTION_VALUE_BUILDER_H
#include "runtime/collection-value.h"
#include "runtime/mem-tracker.h"
#include "runtime/tuple.h"
#include "util/debug-util.h"
#include "util/ubsan.h"
namespace impala {
/// Class for constructing a CollectionValue when the total size isn't known
/// up-front. This class handles allocating the buffer backing the collection from a
/// MemPool, and uses a doubling strategy for growing the collection.
class CollectionValueBuilder {
public:
// I did not pick this default for any meaningful reason, feel free to change!
static const int DEFAULT_INITIAL_TUPLE_CAPACITY = 4;
CollectionValueBuilder(CollectionValue* coll_value, const TupleDescriptor& tuple_desc,
MemPool* pool, RuntimeState* state,
int64_t initial_tuple_capacity = DEFAULT_INITIAL_TUPLE_CAPACITY)
: coll_value_(coll_value),
tuple_desc_(tuple_desc),
pool_(pool),
state_(state) {
buffer_size_ = initial_tuple_capacity * tuple_desc_.byte_size();
coll_value_->ptr = pool_->TryAllocate(buffer_size_);
if (coll_value_->ptr == NULL) buffer_size_ = 0;
}
/// Returns memory to write new tuples to in 'tuple_mem' and returns the maximum number
/// of tuples that may be written before calling CommitTuples() in 'num_tuples'. After
/// calling CommitTuples(), GetMemory() can be called again. Allocates if there is no
/// free tuple memory left. Returns error status if memory limit is exceeded.
Status GetFreeMemory(Tuple** tuple_mem, int* num_tuples) WARN_UNUSED_RESULT {
if (tuple_desc_.byte_size() == 0) {
// No tuple memory necessary, so caller can write as many tuples as 'num_tuples'
// field can count.
// TODO: num_tuples should be 64-bit. Need to update CollectionValue too.
*num_tuples = INT_MAX;
} else {
int64_t bytes_written = coll_value_->ByteSize(tuple_desc_);
DCHECK_GE(buffer_size_, bytes_written);
if (buffer_size_ == bytes_written) {
// Double tuple buffer
int64_t new_buffer_size =
std::max<int64_t>(buffer_size_ * 2, tuple_desc_.byte_size());
uint8_t* new_buf = pool_->TryAllocate(new_buffer_size);
if (UNLIKELY(new_buf == NULL)) {
*tuple_mem = NULL;
*num_tuples = 0;
string path = tuple_desc_.table_desc() == NULL ? "" :
PrintPath(*tuple_desc_.table_desc(), tuple_desc_.tuple_path());
return pool_->mem_tracker()->MemLimitExceeded(state_,
ErrorMsg(TErrorCode::COLLECTION_ALLOC_FAILED, new_buffer_size,
path, buffer_size_, coll_value_->num_tuples).msg(), new_buffer_size);
}
Ubsan::MemCpy(new_buf, coll_value_->ptr, bytes_written);
coll_value_->ptr = new_buf;
buffer_size_ = new_buffer_size;
}
*tuple_mem = reinterpret_cast<Tuple*>(coll_value_->ptr + bytes_written);
*num_tuples = (buffer_size_ - bytes_written) / tuple_desc_.byte_size();
DCHECK_EQ((buffer_size_ - bytes_written) % tuple_desc_.byte_size(), 0);
DCHECK_GT(*num_tuples, 0);
}
return Status::OK();
}
/// Adds 'num_tuples' to the size of the collection. 'num_tuples' must be <= the last
/// value returned by GetMemory().
void CommitTuples(int num_tuples) {
coll_value_->num_tuples += num_tuples;
DCHECK_LE(coll_value_->ByteSize(tuple_desc_), buffer_size_);
}
const TupleDescriptor& tuple_desc() const { return tuple_desc_; }
MemPool* pool() const { return pool_; }
private:
CollectionValue* coll_value_;
/// The tuple desc for coll_value_'s items
const TupleDescriptor& tuple_desc_;
/// The pool backing coll_value_'s buffer
MemPool* pool_;
/// May be NULL. If non-NULL, used to log memory limit errors.
RuntimeState* state_;
/// The current size of coll_value_'s buffer in bytes, including any unused space
/// (i.e. buffer_size_ is equal to or larger than coll_value_->ByteSize()).
int64_t buffer_size_;
};
}
#endif