blob: af496842026cc1d6a6ff03f0d2b2a238ef0ad198 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hdfs-table-writer.h"
#include <sstream>
#include "common/names.h"
#include "runtime/mem-tracker.h"
#include "exec/hdfs-table-sink.h"
#include "util/hdfs-util.h"
namespace impala {
HdfsTableWriter::HdfsTableWriter(HdfsTableSink* parent,
RuntimeState* state, OutputPartition* output,
const HdfsPartitionDescriptor* partition_desc, const HdfsTableDescriptor* table_desc)
: parent_(parent),
state_(state),
output_(output),
table_desc_(table_desc),
output_expr_evals_(parent->output_expr_evals()) {
int num_non_partition_cols =
table_desc_->num_cols() - table_desc_->num_clustering_cols();
DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols)
<< parent_->DebugString();
}
Status HdfsTableWriter::Write(const uint8_t* data, int32_t len) {
DCHECK_GE(len, 0);
DCHECK(output_->tmp_hdfs_file != nullptr);
int ret = hdfsWrite(output_->hdfs_connection, output_->tmp_hdfs_file, data, len);
if (ret == -1) {
string error_msg = GetHdfsErrorMsg("");
stringstream msg;
msg << "Failed to write data (length: " << len
<< ") to Hdfs file: " << output_->current_file_name
<< " " << error_msg;
return Status(msg.str());
}
COUNTER_ADD(parent_->bytes_written_counter(), len);
stats_.set_bytes_written(stats_.bytes_written() + len);
return Status::OK();
}
}