blob: 81f158c25c0493bf8cc8372f755a5a49c8913930 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "common/object-pool.h"
#include "exec/hdfs-table-sink.h"
#include "exec/multi-table-sink.h"
#include "runtime/fragment-state.h"
#include "runtime/runtime-state.h"
namespace impala {
Status MultiTableSinkConfig::Init(
const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
for (const TDataSink& child_sink : tsink.child_data_sinks) {
// We only allow table sinks for now.
DCHECK(child_sink.__isset.table_sink);
DataSinkConfig* data_sink_config;
RETURN_IF_ERROR(DataSinkConfig::CreateConfig(child_sink, input_row_desc,
state, &data_sink_config));
DCHECK(data_sink_config != nullptr);
table_sink_configs_.push_back(static_cast<TableSinkBaseConfig*>(data_sink_config));
}
return Status::OK();
}
DataSink* MultiTableSinkConfig::CreateSink(RuntimeState* state) const {
TDataSinkId sink_id = state->fragment().idx;
return state->obj_pool()->Add(
new MultiTableSink(sink_id, *this, *tsink_, state));
}
void MultiTableSinkConfig::Close() {
for (TableSinkBaseConfig* table_sink_config : table_sink_configs_) {
table_sink_config->Close();
}
DataSinkConfig::Close();
}
MultiTableSink::MultiTableSink(TDataSinkId sink_id,
const MultiTableSinkConfig& sink_config, const TDataSink& dsink,
RuntimeState* state) : DataSink(sink_id, sink_config, "MultiTableSink", state) {
for (TableSinkBaseConfig* tbl_sink_config : sink_config.table_sink_configs()) {
TableSinkBase* tsink_base =
DCHECK_NOTNULL(dynamic_cast<TableSinkBase*>(tbl_sink_config->CreateSink(state)));
table_sinks_.push_back(tsink_base);
profile()->AddChild(tsink_base->profile());
}
}
Status MultiTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
for (TableSinkBase* tsink : table_sinks_) {
RETURN_IF_ERROR(tsink->Prepare(state, parent_mem_tracker));
}
return Status::OK();
}
Status MultiTableSink::Open(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::Open(state));
for (TableSinkBase* tsink : table_sinks_) {
RETURN_IF_ERROR(tsink->Open(state));
}
return Status::OK();
}
Status MultiTableSink::Send(RuntimeState* state, RowBatch* batch) {
for (TableSinkBase* tsink : table_sinks_) {
RETURN_IF_ERROR(tsink->Send(state, batch));
}
return Status::OK();
}
Status MultiTableSink::FlushFinal(RuntimeState* state) {
DCHECK(!closed_);
for (TableSinkBase* tsink : table_sinks_) {
RETURN_IF_ERROR(tsink->FlushFinal(state));
}
return Status::OK();
}
void MultiTableSink::Close(RuntimeState* state) {
for (TableSinkBase* tsink : table_sinks_) {
tsink->Close(state);
}
DataSink::Close(state);
DCHECK(closed_);
}
}