blob: c57d0f1109ecafbb17948f4a8037c1b09b065b6f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_UDA_TEST_HARNESS_IMPL_H
#define IMPALA_UDA_TEST_HARNESS_IMPL_H
// THIS FILE IS USED BY THE STANDALONE IMPALA UDF DEVELOPMENT KIT.
// IT MUST BE BUILDABLE WITH C++98 AND WITHOUT ANY INTERNAL IMPALA HEADERS.
#include <string>
#include <sstream>
#include <vector>
// Use boost::shared_ptr instead of std::shared_ptr so it can be built with C++98
#include <boost/shared_ptr.hpp>
namespace impala_udf {
/// Utility class to help test UDAs. This can be used to test the correctness of the
/// UDA, simulating multiple possible distributed execution setups.
/// For example, the harness will run in the UDA in single node mode (so merge and
/// serialize are unused), single level merge and multi-level merge.
/// The test application is responsible for providing the data and expected result.
class UdaTestHarnessUtil {
public:
template<typename T>
static T CreateIntermediate(FunctionContext* context, int byte_size) {
return T();
}
template<typename T>
static void FreeIntermediate(FunctionContext* context, const T& v) {
/// No-op
return;
}
/// Copy src value into context, returning the new copy. This simulates
/// copying the bytes between nodes.
template<typename T>
static T CopyIntermediate(FunctionContext* context, int byte_size, const T& src) {
return src;
}
};
template<>
BufferVal UdaTestHarnessUtil::CreateIntermediate(
FunctionContext* context, int byte_size) {
return reinterpret_cast<BufferVal>(context->Allocate(byte_size));
}
template<>
void UdaTestHarnessUtil::FreeIntermediate(
FunctionContext* context, const BufferVal& v) {
context->Free(v);
}
template<>
BufferVal UdaTestHarnessUtil::CopyIntermediate(
FunctionContext* context, int byte_size, const BufferVal& src) {
BufferVal v = reinterpret_cast<BufferVal>(context->Allocate(byte_size));
memcpy(v, src, byte_size);
return v;
}
/// Returns false if there is an error set in the context.
template<typename RESULT, typename INTERMEDIATE>
bool UdaTestHarnessBase<RESULT, INTERMEDIATE>::CheckContext(FunctionContext* context) {
if (context->has_error()) {
std::stringstream ss;
ss << "UDA set error to: " << context->error_msg();
error_msg_ = ss.str();
return false;
}
return true;
}
template<typename RESULT, typename INTERMEDIATE>
bool UdaTestHarnessBase<RESULT, INTERMEDIATE>::CheckResult(
const RESULT& x, const RESULT& y) {
if (result_comparator_fn_ == NULL) return x == y;
return result_comparator_fn_(x, y);
}
/// Runs the UDA in all the modes, validating the result is 'expected' each time.
template<typename RESULT, typename INTERMEDIATE>
bool UdaTestHarnessBase<RESULT, INTERMEDIATE>::Execute(
const RESULT& expected, UdaExecutionMode mode) {
error_msg_ = "";
RESULT result;
FunctionContext::TypeDesc return_type;
std::vector<FunctionContext::TypeDesc> arg_types; // TODO
if (mode == ALL || mode == SINGLE_NODE) {
{
ScopedFunctionContext context(
UdfTestHarness::CreateTestContext(return_type, arg_types), this);
result = ExecuteSingleNode(&context);
if (error_msg_.empty() && !CheckResult(result, expected)) {
std::stringstream ss;
ss << "UDA failed running in single node execution." << std::endl
<< "Expected: " << DebugString(expected)
<< " Actual: " << DebugString(result);
error_msg_ = ss.str();
}
}
if (!error_msg_.empty()) return false;
}
const int num_nodes[] = { 1, 2, 10, 20, 100 };
if (mode == ALL || mode == ONE_LEVEL) {
for (int i = 0; i < sizeof(num_nodes) / sizeof(int); ++i) {
ScopedFunctionContext context(
UdfTestHarness::CreateTestContext(return_type, arg_types), this);
result = ExecuteOneLevel(num_nodes[i], &context);
if (error_msg_.empty() && !CheckResult(result, expected)) {
std::stringstream ss;
ss << "UDA failed running in one level distributed mode with "
<< num_nodes[i] << " nodes." << std::endl
<< "Expected: " << DebugString(expected)
<< " Actual: " << DebugString(result);
error_msg_ = ss.str();
return false;
}
}
if (!error_msg_.empty()) return false;
}
if (mode == ALL || mode == TWO_LEVEL) {
for (int i = 0; i < sizeof(num_nodes) / sizeof(int); ++i) {
for (int j = 0; j <= i; ++j) {
ScopedFunctionContext context(
UdfTestHarness::CreateTestContext(return_type, arg_types), this);
result = ExecuteTwoLevel(num_nodes[i], num_nodes[j], &context);
if (error_msg_.empty() && !CheckResult(result, expected)) {
std::stringstream ss;
ss << "UDA failed running in two level distributed mode with "
<< num_nodes[i] << " nodes in the first level and "
<< num_nodes[j] << " nodes in the second level." << std::endl
<< "Expected: " << DebugString(expected)
<< " Actual: " << DebugString(result);
error_msg_ = ss.str();
return false;
}
}
}
if (!error_msg_.empty()) return false;
}
return true;
}
template<typename RESULT, typename INTERMEDIATE>
RESULT UdaTestHarnessBase<RESULT, INTERMEDIATE>::ExecuteSingleNode(
ScopedFunctionContext* context) {
INTERMEDIATE intermediate =
UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
context->get(), fixed_buffer_byte_size_);
init_fn_(context->get(), &intermediate);
if (!CheckContext(context->get())) return RESULT::null();
for (int i = 0; i < num_input_values_; ++i) {
Update(i, context->get(), &intermediate);
}
if (!CheckContext(context->get())) return RESULT::null();
// Single node doesn't need merge or serialize
RESULT result = finalize_fn_(context->get(), intermediate);
UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(context->get(), intermediate);
if (!CheckContext(context->get())) return RESULT::null();
return result;
}
template<typename RESULT, typename INTERMEDIATE>
RESULT UdaTestHarnessBase<RESULT, INTERMEDIATE>::ExecuteOneLevel(int num_nodes,
ScopedFunctionContext* result_context) {
std::vector<boost::shared_ptr<ScopedFunctionContext> > contexts;
std::vector<INTERMEDIATE> intermediates;
contexts.resize(num_nodes);
FunctionContext::TypeDesc return_type;
std::vector<FunctionContext::TypeDesc> arg_types; // TODO
for (int i = 0; i < num_nodes; ++i) {
FunctionContext* cxt = UdfTestHarness::CreateTestContext(return_type, arg_types);
contexts[i].reset(new ScopedFunctionContext(cxt, this));
intermediates.push_back(UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
cxt, fixed_buffer_byte_size_));
init_fn_(cxt, &intermediates[i]);
if (!CheckContext(cxt)) return RESULT::null();
}
INTERMEDIATE merged =
UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
result_context->get(), fixed_buffer_byte_size_);
init_fn_(result_context->get(), &merged);
if (!CheckContext(result_context->get())) return RESULT::null();
// Process all the values in the single level num_nodes contexts
for (int i = 0; i < num_input_values_; ++i) {
int target = i % num_nodes;
Update(i, contexts[target].get()->get(), &intermediates[target]);
}
// Merge them all into the final
for (int i = 0; i < num_nodes; ++i) {
if (!CheckContext(contexts[i].get()->get())) return RESULT::null();
INTERMEDIATE serialized = intermediates[i];
if (serialize_fn_ != NULL) {
serialized = serialize_fn_(contexts[i].get()->get(), intermediates[i]);
}
INTERMEDIATE copy =
UdaTestHarnessUtil::CopyIntermediate<INTERMEDIATE>(
result_context->get(), fixed_buffer_byte_size_, serialized);
UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
contexts[i].get()->get(), intermediates[i]);
merge_fn_(result_context->get(), copy, &merged);
UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(result_context->get(), copy);
if (!CheckContext(contexts[i].get()->get())) return RESULT::null();
contexts[i].reset();
}
if (!CheckContext(result_context->get())) return RESULT::null();
RESULT result = finalize_fn_(result_context->get(), merged);
UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(result_context->get(), merged);
if (!CheckContext(result_context->get())) return RESULT::null();
return result;
}
template<typename RESULT, typename INTERMEDIATE>
RESULT UdaTestHarnessBase<RESULT, INTERMEDIATE>::ExecuteTwoLevel(
int num1, int num2, ScopedFunctionContext* result_context) {
std::vector<boost::shared_ptr<ScopedFunctionContext> > level1_contexts, level2_contexts;
std::vector<INTERMEDIATE> level1_intermediates, level2_intermediates;
level1_contexts.resize(num1);
level2_contexts.resize(num2);
FunctionContext::TypeDesc return_type;
std::vector<FunctionContext::TypeDesc> arg_types; // TODO
// Initialize the intermediate contexts and intermediate result buffers
for (int i = 0; i < num1; ++i) {
FunctionContext* cxt = UdfTestHarness::CreateTestContext(return_type, arg_types);
level1_contexts[i].reset(new ScopedFunctionContext(cxt, this));
level1_intermediates.push_back(
UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
cxt, fixed_buffer_byte_size_));
init_fn_(cxt, &level1_intermediates[i]);
if (!CheckContext(cxt)) return RESULT::null();
}
for (int i = 0; i < num2; ++i) {
FunctionContext* cxt = UdfTestHarness::CreateTestContext(return_type, arg_types);
level2_contexts[i].reset(new ScopedFunctionContext(cxt, this));
level2_intermediates.push_back(
UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
cxt, fixed_buffer_byte_size_));
init_fn_(cxt, &level2_intermediates[i]);
if (!CheckContext(cxt)) return RESULT::null();
}
// Initialize the final context and final intermediate buffer
INTERMEDIATE final_intermediate =
UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
result_context->get(), fixed_buffer_byte_size_);
init_fn_(result_context->get(), &final_intermediate);
if (!CheckContext(result_context->get())) return RESULT::null();
// Assign all the input values to level 1 updates
for (int i = 0; i < num_input_values_; ++i) {
int target = i % num1;
Update(i, level1_contexts[target].get()->get(), &level1_intermediates[target]);
}
// Serialize the level 1 intermediates and merge them with a level 2 intermediate
for (int i = 0; i < num1; ++i) {
if (!CheckContext(level1_contexts[i].get()->get())) return RESULT::null();
int target = i % num2;
INTERMEDIATE serialized = level1_intermediates[i];
if (serialize_fn_ != NULL) {
serialized = serialize_fn_(level1_contexts[i].get()->get(), level1_intermediates[i]);
}
INTERMEDIATE copy =
UdaTestHarnessUtil::CopyIntermediate<INTERMEDIATE>(
level2_contexts[target].get()->get(), fixed_buffer_byte_size_, serialized);
UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
level1_contexts[i].get()->get(), level1_intermediates[i]);
merge_fn_(level2_contexts[target].get()->get(),
copy, &level2_intermediates[target]);
UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
level2_contexts[target].get()->get(), copy);
if (!CheckContext(level1_contexts[i].get()->get())) return RESULT::null();
level1_contexts[i].reset();
}
// Merge all the level twos into the final
for (int i = 0; i < num2; ++i) {
if (!CheckContext(level2_contexts[i].get()->get())) return RESULT::null();
INTERMEDIATE serialized = level2_intermediates[i];
if (serialize_fn_ != NULL) {
serialized = serialize_fn_(level2_contexts[i].get()->get(), level2_intermediates[i]);
}
INTERMEDIATE copy =
UdaTestHarnessUtil::CopyIntermediate<INTERMEDIATE>(
result_context->get(), fixed_buffer_byte_size_, serialized);
UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
level2_contexts[i].get()->get(), level2_intermediates[i]);
merge_fn_(result_context->get(), copy, &final_intermediate);
UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
result_context->get(), copy);
if (!CheckContext(level2_contexts[i].get()->get())) return RESULT::null();
level2_contexts[i].reset();
}
if (!CheckContext(result_context->get())) return RESULT::null();
RESULT result = finalize_fn_(result_context->get(), final_intermediate);
UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
result_context->get(), final_intermediate);
if (!CheckContext(result_context->get())) return RESULT::null();
return result;
}
template<typename RESULT, typename INTERMEDIATE, typename INPUT>
bool UdaTestHarness<RESULT, INTERMEDIATE, INPUT>::Execute(
const std::vector<INPUT>& values, const RESULT& expected,
UdaExecutionMode mode) {
input_.resize(values.size());
BaseClass::num_input_values_ = values.size();
for (int i = 0; i < values.size(); ++i) {
input_[i] = &values[i];
}
return BaseClass::Execute(expected, mode);
}
template<typename RESULT, typename INTERMEDIATE, typename INPUT>
void UdaTestHarness<RESULT, INTERMEDIATE, INPUT>::Update(
int idx, FunctionContext* context, INTERMEDIATE* dst) {
update_fn_(context, *input_[idx], dst);
}
/// Runs the UDA in all the modes, validating the result is 'expected' each time.
template<typename RESULT, typename INTERMEDIATE, typename INPUT1, typename INPUT2>
bool UdaTestHarness2<RESULT, INTERMEDIATE, INPUT1, INPUT2>::Execute(
const std::vector<INPUT1>& values1, const std::vector<INPUT2>& values2,
const RESULT& expected, UdaExecutionMode mode) {
if (values1.size() != values2.size()) {
BaseClass::error_msg_ =
"UdaTestHarness::Execute: values1 and values2 must be the same size.";
return false;
}
input1_ = &values1;
input2_ = &values2;
BaseClass::num_input_values_ = input1_->size();
return BaseClass::Execute(expected, mode);
}
template<typename RESULT, typename INTERMEDIATE, typename INPUT1, typename INPUT2>
void UdaTestHarness2<RESULT, INTERMEDIATE, INPUT1, INPUT2>::Update(
int idx, FunctionContext* context, INTERMEDIATE* dst) {
update_fn_(context, (*input1_)[idx], (*input2_)[idx], dst);
}
template<typename RESULT, typename INTERMEDIATE, typename INPUT1, typename INPUT2,
typename INPUT3>
bool UdaTestHarness3<RESULT, INTERMEDIATE, INPUT1, INPUT2, INPUT3>::Execute(
const std::vector<INPUT1>& values1, const std::vector<INPUT2>& values2,
const std::vector<INPUT3>& values3, const RESULT& expected, UdaExecutionMode mode) {
if (values1.size() != values2.size() || values1.size() != values3.size()) {
BaseClass::error_msg_ =
"UdaTestHarness::Execute: input values vectors must be the same size.";
return false;
}
input1_ = &values1;
input2_ = &values2;
input3_ = &values3;
BaseClass::num_input_values_ = input1_->size();
return BaseClass::Execute(expected, mode);
}
template<typename RESULT, typename INTERMEDIATE, typename INPUT1, typename INPUT2,
typename INPUT3>
void UdaTestHarness3<RESULT, INTERMEDIATE, INPUT1, INPUT2, INPUT3>::Update(
int idx, FunctionContext* context, INTERMEDIATE* dst) {
update_fn_(context, (*input1_)[idx], (*input2_)[idx], (*input3_)[idx], dst);
}
template<typename RESULT, typename INTERMEDIATE, typename INPUT1, typename INPUT2,
typename INPUT3, typename INPUT4>
bool UdaTestHarness4<RESULT, INTERMEDIATE, INPUT1, INPUT2, INPUT3, INPUT4>::Execute(
const std::vector<INPUT1>& values1, const std::vector<INPUT2>& values2,
const std::vector<INPUT3>& values3, const std::vector<INPUT4>& values4,
const RESULT& expected, UdaExecutionMode mode) {
if (values1.size() != values2.size() || values1.size() != values3.size() ||
values1.size() != values4.size()) {
BaseClass::error_msg_ =
"UdaTestHarness::Execute: input values vectors must be the same size.";
return false;
}
input1_ = &values1;
input2_ = &values2;
input3_ = &values3;
input4_ = &values4;
BaseClass::num_input_values_ = input1_->size();
return BaseClass::Execute(expected, mode);
}
template<typename RESULT, typename INTERMEDIATE, typename INPUT1, typename INPUT2,
typename INPUT3, typename INPUT4>
void UdaTestHarness4<RESULT, INTERMEDIATE, INPUT1, INPUT2, INPUT3, INPUT4>::Update(
int idx, FunctionContext* context, INTERMEDIATE* dst) {
update_fn_(context, (*input1_)[idx], (*input2_)[idx], (*input3_)[idx], (*input4_)[idx],
dst);
}
}
#endif