blob: 0b58e3f9e57c696c1ba9607dd396a46f130263a4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/external-data-source-executor.h"
#include <boost/thread.hpp>
#include <list>
#include <string>
#include "common/logging.h"
#include "rpc/jni-thrift-util.h"
#include "runtime/exec-env.h"
#include "runtime/lib-cache.h"
#include "util/parse-util.h"
#include "util/metrics.h"
#include "common/names.h"
using namespace impala;
using namespace impala::extdatasource;
/// Static state shared across instances of the JNI ExternalDataSourceExecutor.
class ExternalDataSourceExecutor::JniState {
public:
/// Gets the singleton instance. Creation of the instance is not thread-safe
/// (until C++11), but called once by ExternalDataSourceExecutor::Init() on startup,
/// at which time JniState::Init() (see below) is called once.
static ExternalDataSourceExecutor::JniState& GetInstance() {
static ExternalDataSourceExecutor::JniState state;
return state;
}
/// Initializes the JniState. Called exactly once by ExternalDataSourceExecutor::Init()
/// process startup.
Status Init(MetricGroup* metrics) {
DCHECK(executor_class_ == NULL) << "JniState was already initialized.";
JniMethodDescriptor methods[] = {
{"<init>",
"(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V",
&ctor_},
{"open", "([B)[B", &open_id_},
{"getNext", "([B)[B", &get_next_id_},
{"close", "([B)[B", &close_id_}};
JNIEnv* env = JniUtil::GetJNIEnv();
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(env));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
"org/apache/impala/extdatasource/ExternalDataSourceExecutor",
&executor_class_));
uint32_t num_methods = sizeof(methods) / sizeof(methods[0]);
for (int i = 0; i < num_methods; ++i) {
RETURN_IF_ERROR(JniUtil::LoadJniMethod(env, executor_class_, &(methods[i])));
}
get_num_cache_hits_id_ = env->GetStaticMethodID(executor_class_,
"getNumClassCacheHits", "()J");
RETURN_ERROR_IF_EXC(env);
get_num_cache_misses_id_ = env->GetStaticMethodID(executor_class_,
"getNumClassCacheMisses", "()J");
RETURN_ERROR_IF_EXC(env);
num_class_cache_hits_ = metrics->AddCounter(
"external-data-source.class-cache.hits", 0);
num_class_cache_misses_ = metrics->AddCounter(
"external-data-source.class-cache.misses", 0);
return Status::OK();
}
/// Updates the class cache metrics via the static JNI methods on
/// ExternalDataSourceExecutor.
Status UpdateClassCacheMetrics() const {
DCHECK(executor_class_ != NULL) << "JniState was not initialized.";
JNIEnv* env = JniUtil::GetJNIEnv();
int64_t num_cache_hits = env->CallStaticLongMethod(executor_class_,
get_num_cache_hits_id_);
RETURN_ERROR_IF_EXC(env);
num_class_cache_hits_->SetValue(num_cache_hits);
int64_t num_cache_misses = env->CallStaticLongMethod(executor_class_,
get_num_cache_misses_id_);
RETURN_ERROR_IF_EXC(env);
num_class_cache_misses_->SetValue(num_cache_misses);
return Status::OK();
}
/// Class reference for org.apache.impala.extdatasource.ExternalDataSourceExecutor
jclass executor_class_;
jmethodID ctor_;
jmethodID open_id_; // ExternalDataSourceExecutor.open()
jmethodID get_next_id_; // ExternalDataSourceExecutor.getNext()
jmethodID close_id_; // ExternalDataSourceExecutor.close()
// Static methods for getting the number of class cache hits/misses.
jmethodID get_num_cache_hits_id_;
jmethodID get_num_cache_misses_id_;
IntCounter* num_class_cache_hits_;
IntCounter* num_class_cache_misses_;
private:
JniState() : executor_class_(NULL) { }
DISALLOW_COPY_AND_ASSIGN(JniState);
};
Status ExternalDataSourceExecutor::InitJNI(MetricGroup* metrics) {
// Initializes the JniState singleton and initializes the metric values.
JniState& s = JniState::GetInstance();
RETURN_IF_ERROR(s.Init(metrics));
RETURN_IF_ERROR(s.UpdateClassCacheMetrics());
return Status::OK();
}
ExternalDataSourceExecutor::~ExternalDataSourceExecutor() {
DCHECK(!is_initialized_);
}
Status ExternalDataSourceExecutor::Init(const string& jar_path,
const string& class_name, const string& api_version, const string& init_string) {
DCHECK(!is_initialized_);
LibCacheEntryHandle handle;
string local_jar_path;
// TODO(IMPALA-6727): pass the mtime from the coordinator. for now, skip the mtime
// check (-1).
RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(
jar_path, LibCache::TYPE_JAR, -1, &handle, &local_jar_path));
JNIEnv* jni_env = JniUtil::GetJNIEnv();
// Add a scoped cleanup jni reference object. This cleans up local refs made below.
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(jni_env));
jstring jar_path_jstr = jni_env->NewStringUTF(local_jar_path.c_str());
RETURN_ERROR_IF_EXC(jni_env);
jstring class_name_jstr = jni_env->NewStringUTF(class_name.c_str());
RETURN_ERROR_IF_EXC(jni_env);
jstring api_version_jstr = jni_env->NewStringUTF(api_version.c_str());
RETURN_ERROR_IF_EXC(jni_env);
jstring init_string_jstr = jni_env->NewStringUTF(init_string.c_str());
RETURN_ERROR_IF_EXC(jni_env);
const JniState& s = JniState::GetInstance();
jobject local_exec = jni_env->NewObject(s.executor_class_, s.ctor_, jar_path_jstr,
class_name_jstr, api_version_jstr, init_string_jstr);
RETURN_ERROR_IF_EXC(jni_env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, local_exec, &executor_));
RETURN_IF_ERROR(s.UpdateClassCacheMetrics());
is_initialized_ = true;
return Status::OK();
}
// JniUtil::CallJniMethod() does not compile when the template parameters are in
// another namespace. The issue seems to be that SerializeThriftMsg/DeserializeThriftMsg
// are not being generated for these types.
// TODO: Understand what's happening, remove, and use JniUtil::CallJniMethod
template <typename T, typename R>
Status CallJniMethod(const jobject& obj, const jmethodID& method, const T& arg,
R* response) {
JNIEnv* jni_env = JniUtil::GetJNIEnv();
jbyteArray request_bytes;
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(jni_env));
RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes));
jbyteArray result_bytes = static_cast<jbyteArray>(
jni_env->CallObjectMethod(obj, method, request_bytes));
RETURN_ERROR_IF_EXC(jni_env);
RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, response));
return Status::OK();
}
Status ExternalDataSourceExecutor::Open(const TOpenParams& params, TOpenResult* result) {
DCHECK(is_initialized_);
const JniState& s = JniState::GetInstance();
return CallJniMethod(executor_, s.open_id_, params, result);
}
Status ExternalDataSourceExecutor::GetNext(const TGetNextParams& params,
TGetNextResult* result) {
DCHECK(is_initialized_);
const JniState& s = JniState::GetInstance();
return CallJniMethod(executor_, s.get_next_id_, params, result);
}
Status ExternalDataSourceExecutor::Close(const TCloseParams& params,
TCloseResult* result) {
DCHECK(is_initialized_);
const JniState& s = JniState::GetInstance();
Status status = CallJniMethod(executor_, s.close_id_, params,
result);
JNIEnv* env = JniUtil::GetJNIEnv();
if (executor_ != NULL) env->DeleteGlobalRef(executor_);
is_initialized_ = false;
return status;
}