blob: 3c522ba7ee51e2e4905a4c5ebec49cebb7cf6272 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hdfs-sequence-table-writer.h"
#include "exec/write-stream.inline.h"
#include "exec/exec-node.h"
#include "util/hdfs-util.h"
#include "util/uid-util.h"
#include "exprs/expr.h"
#include "exprs/expr-context.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/hdfs-fs-cache.h"
#include "util/runtime-profile-counters.h"
#include <vector>
#include <hdfs.h>
#include <boost/scoped_ptr.hpp>
#include <stdlib.h>
#include "common/names.h"
namespace impala {
uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6};
const char* HdfsSequenceTableWriter::VALUE_CLASS_NAME = "org.apache.hadoop.io.Text";
HdfsSequenceTableWriter::HdfsSequenceTableWriter(HdfsTableSink* parent,
RuntimeState* state, OutputPartition* output,
const HdfsPartitionDescriptor* partition,
const HdfsTableDescriptor* table_desc,
const vector<ExprContext*>& output_exprs)
: HdfsTableWriter(parent, state, output, partition, table_desc, output_exprs),
mem_pool_(new MemPool(parent->mem_tracker())), compress_flag_(false),
unflushed_rows_(0), record_compression_(false) {
approx_block_size_ = 64 * 1024 * 1024;
parent->mem_tracker()->Consume(approx_block_size_);
field_delim_ = partition->field_delim();
escape_char_ = partition->escape_char();
}
Status HdfsSequenceTableWriter::Init() {
THdfsCompression::type codec = THdfsCompression::SNAPPY_BLOCKED;
const TQueryOptions& query_options = state_->query_options();
if (query_options.__isset.compression_codec) {
codec = query_options.compression_codec;
if (codec == THdfsCompression::SNAPPY) {
// Seq file (and in general things that use hadoop.io.codec) always
// mean snappy_blocked.
codec = THdfsCompression::SNAPPY_BLOCKED;
}
}
if (codec != THdfsCompression::NONE) {
compress_flag_ = true;
if (query_options.__isset.seq_compression_mode) {
record_compression_ =
query_options.seq_compression_mode == THdfsSeqCompressionMode::RECORD;
}
RETURN_IF_ERROR(Codec::GetHadoopCodecClassName(codec, &codec_name_));
RETURN_IF_ERROR(Codec::CreateCompressor(
mem_pool_.get(), true, codec_name_, &compressor_));
DCHECK(compressor_.get() != NULL);
}
// create the Sync marker
string uuid = GenerateUUIDString();
uint8_t sync_neg1[20];
ReadWriteUtil::PutInt(sync_neg1, static_cast<uint32_t>(-1));
DCHECK(uuid.size() == 16);
memcpy(sync_neg1 + sizeof(int32_t), uuid.data(), uuid.size());
neg1_sync_marker_ = string(reinterpret_cast<char*>(sync_neg1), 20);
sync_marker_ = uuid;
return Status::OK();
}
Status HdfsSequenceTableWriter::AppendRows(
RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) {
int32_t limit;
if (row_group_indices.empty()) {
limit = batch->num_rows();
} else {
limit = row_group_indices.size();
}
COUNTER_ADD(parent_->rows_inserted_counter(), limit);
bool all_rows = row_group_indices.empty();
int num_non_partition_cols =
table_desc_->num_cols() - table_desc_->num_clustering_cols();
DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << parent_->DebugString();
{
SCOPED_TIMER(parent_->encode_timer());
if (all_rows) {
for (int row_idx = 0; row_idx < limit; ++row_idx) {
RETURN_IF_ERROR(ConsumeRow(batch->GetRow(row_idx)));
}
} else {
for (int row_idx = 0; row_idx < limit; ++row_idx) {
TupleRow* row = batch->GetRow(row_group_indices[row_idx]);
RETURN_IF_ERROR(ConsumeRow(row));
}
}
}
if (!compress_flag_) {
out_.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data());
}
if (out_.Size() >= approx_block_size_) Flush();
*new_file = false;
return Status::OK();
}
Status HdfsSequenceTableWriter::WriteFileHeader() {
out_.WriteBytes(sizeof(SEQ6_CODE), reinterpret_cast<uint8_t*>(SEQ6_CODE));
// Output an empty KeyClassName field
out_.WriteEmptyText();
// Setup to be correct value class
out_.WriteText(strlen(VALUE_CLASS_NAME),
reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME));
// Flag for if compression is used
out_.WriteBoolean(compress_flag_);
// Only valid if compression is used. Indicates if block compression is used.
out_.WriteBoolean(!record_compression_);
// Output the name of our compression codec, parsed by readers
if (compress_flag_) {
out_.WriteText(codec_name_.size(),
reinterpret_cast<const uint8_t*>(codec_name_.data()));
}
// Meta data is formated as an integer N followed by N*2 strings,
// which are key-value pairs. Hive does not write meta data, so neither does Impala
out_.WriteInt(0);
// write the sync marker
out_.WriteBytes(sync_marker_.size(), sync_marker_.data());
string text = out_.String();
RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(text.c_str()), text.size()));
out_.Clear();
return Status::OK();
}
Status HdfsSequenceTableWriter::WriteCompressedBlock() {
WriteStream header;
DCHECK(compress_flag_);
// add a sync marker to start of the block
header.WriteBytes(sync_marker_.size(), sync_marker_.data());
header.WriteVLong(unflushed_rows_);
// Write Key Lengths and Key Values
header.WriteEmptyText();
header.WriteEmptyText();
// Output an Empty string for value Lengths
header.WriteEmptyText();
uint8_t *output;
int64_t output_length;
string text = out_.String();
{
SCOPED_TIMER(parent_->compress_timer());
RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
reinterpret_cast<uint8_t*>(&text[0]), &output_length, &output));
}
header.WriteVInt(output_length);
string head = header.String();
RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(head.data()),
head.size()));
RETURN_IF_ERROR(Write(output, output_length));
return Status::OK();
}
inline void HdfsSequenceTableWriter::WriteEscapedString(const StringValue* str_val,
WriteStream* buf) {
for (int i = 0; i < str_val->len; ++i) {
if (str_val->ptr[i] == field_delim_ || str_val->ptr[i] == escape_char_) {
buf->WriteByte(escape_char_);
}
buf->WriteByte(str_val->ptr[i]);
}
}
void HdfsSequenceTableWriter::EncodeRow(TupleRow* row, WriteStream* buf) {
// TODO Unify with text table writer
int num_non_partition_cols =
table_desc_->num_cols() - table_desc_->num_clustering_cols();
DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << parent_->DebugString();
for (int j = 0; j < num_non_partition_cols; ++j) {
void* value = output_expr_ctxs_[j]->GetValue(row);
if (value != NULL) {
if (output_expr_ctxs_[j]->root()->type().type == TYPE_STRING) {
WriteEscapedString(reinterpret_cast<const StringValue*>(value), &row_buf_);
} else {
string str;
output_expr_ctxs_[j]->PrintValue(value, &str);
buf->WriteBytes(str.size(), str.data());
}
} else {
// NULLs in hive are encoded based on the 'serialization.null.format' property.
const string& null_val = table_desc_->null_column_value();
buf->WriteBytes(null_val.size(), null_val.data());
}
// Append field delimiter.
if (j + 1 < num_non_partition_cols) {
buf->WriteByte(field_delim_);
}
}
}
inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) {
++unflushed_rows_;
row_buf_.Clear();
if (compress_flag_ && !record_compression_) {
// Output row for a block compressed sequence file
// write the length as a vlong and then write the contents
EncodeRow(row, &row_buf_);
out_.WriteVLong(row_buf_.Size());
out_.WriteBytes(row_buf_.Size(), row_buf_.String().data());
return Status::OK();
}
EncodeRow(row, &row_buf_);
const uint8_t* value_bytes;
int64_t value_length;
if (record_compression_) {
// apply compression to row_buf_
// the length of the buffer must be prefixed to the buffer prior to compression
//
// TODO this incurs copy overhead to place the length in front of the
// buffer prior to compression. We may want to rewrite to avoid copying.
string text = row_buf_.String();
row_buf_.Clear();
// encoding as "Text" writes the length before the text
row_buf_.WriteText(text.size(), reinterpret_cast<const uint8_t*>(&text.data()[0]));
text = row_buf_.String();
uint8_t *tmp;
{
SCOPED_TIMER(parent_->compress_timer());
RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
reinterpret_cast<uint8_t*>(&text[0]), &value_length, &tmp));
}
value_bytes = tmp;
} else {
value_length = row_buf_.Size();
value_bytes = reinterpret_cast<const uint8_t*>(row_buf_.String().data());
}
int rec_len = value_length;
// if the record is compressed, the length is part of the compressed text
// if not, then we need to write the length (below) and account for it's size
if (!record_compression_) rec_len += ReadWriteUtil::VLongRequiredBytes(value_length);
// Length of the record (incl. key length and value length)
out_.WriteInt(rec_len);
// Write length of the key (Impala/Hive doesn't write a key)
out_.WriteInt(0);
// if the record is compressed, the length is part of the compressed text
if (!record_compression_) out_.WriteVLong(value_length);
// write out the value (possibly compressed)
out_.WriteBytes(value_length, value_bytes);
return Status::OK();
}
Status HdfsSequenceTableWriter::Flush() {
if (unflushed_rows_ == 0) return Status::OK();
SCOPED_TIMER(parent_->hdfs_write_timer());
if (compress_flag_ && !record_compression_) {
RETURN_IF_ERROR(WriteCompressedBlock());
} else {
string out_str = out_.String();
RETURN_IF_ERROR(
Write(reinterpret_cast<const uint8_t*>(out_str.data()), out_str.size()));
}
out_.Clear();
unflushed_rows_ = 0;
return Status::OK();
}
void HdfsSequenceTableWriter::Close() {
// TODO: double check there is no memory leak.
parent_->mem_tracker()->Release(approx_block_size_);
mem_pool_->FreeAll();
}
} // namespace impala