blob: 5de894db80f7f9938d556b8db498557fc5da2969 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_INLINE_H
#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_INLINE_H
#include "runtime/buffered-tuple-stream.h"
#include "runtime/descriptors.h"
#include "runtime/tuple-row.h"
#include "util/bit-util.h"
namespace impala {
inline int BufferedTupleStream::NullIndicatorBytesPerRow() const {
DCHECK(has_nullable_tuple_);
return BitUtil::RoundUpNumBytes(fixed_tuple_sizes_.size());
}
inline uint8_t* BufferedTupleStream::AddRowCustomBegin(int64_t size, Status* status) {
DCHECK(!closed_);
DCHECK(has_write_iterator());
if (UNLIKELY(write_page_ == nullptr || write_ptr_ + size > write_end_ptr_)) {
return AddRowCustomBeginSlow(size, status);
}
DCHECK(write_page_ != nullptr);
DCHECK(write_page_->is_pinned());
DCHECK_LE(write_ptr_ + size, write_end_ptr_);
++num_rows_;
++write_page_->num_rows;
uint8_t* data = write_ptr_;
write_ptr_ += size;
return data;
}
inline void BufferedTupleStream::AddRowCustomEnd(int64_t size) {
if (UNLIKELY(size > default_page_len_)) AddLargeRowCustomEnd(size);
}
inline void BufferedTupleStream::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const {
DCHECK(row != nullptr);
DCHECK(!closed_);
DCHECK(is_pinned());
DCHECK(!read_it_.attach_on_read_);
uint8_t* data = flat_row;
return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) :
UnflattenTupleRow<false>(&data, row);
}
template <bool HAS_NULLABLE_TUPLE>
inline void BufferedTupleStream::UnflattenTupleRow(uint8_t** data, TupleRow* row) const {
const int tuples_per_row = desc_->tuple_descriptors().size();
uint8_t* ptr = *data;
if (HAS_NULLABLE_TUPLE) {
// Stitch together the tuples from the page and the NULL ones.
const uint8_t* null_indicators = ptr;
ptr += NullIndicatorBytesPerRow();
for (int i = 0; i < tuples_per_row; ++i) {
const uint8_t* null_word = null_indicators + (i >> 3);
const uint32_t null_pos = i & 7;
const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
row->SetTuple(
i, reinterpret_cast<Tuple*>(reinterpret_cast<uint64_t>(ptr) * is_not_null));
ptr += fixed_tuple_sizes_[i] * is_not_null;
}
} else {
for (int i = 0; i < tuples_per_row; ++i) {
row->SetTuple(i, reinterpret_cast<Tuple*>(ptr));
ptr += fixed_tuple_sizes_[i];
}
}
*data = ptr;
}
}
#endif