| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/external-data-source-executor.h" |
| |
| #include <boost/thread.hpp> |
| #include <list> |
| #include <string> |
| |
| #include "common/logging.h" |
| #include "rpc/jni-thrift-util.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/lib-cache.h" |
| #include "util/parse-util.h" |
| #include "util/metrics.h" |
| |
| #include "common/names.h" |
| |
| using namespace impala; |
| using namespace impala::extdatasource; |
| |
| /// Static state shared across instances of the JNI ExternalDataSourceExecutor. |
| class ExternalDataSourceExecutor::JniState { |
| public: |
| /// Gets the singleton instance. Creation of the instance is not thread-safe |
| /// (until C++11), but called once by ExternalDataSourceExecutor::Init() on startup, |
| /// at which time JniState::Init() (see below) is called once. |
| static ExternalDataSourceExecutor::JniState& GetInstance() { |
| static ExternalDataSourceExecutor::JniState state; |
| return state; |
| } |
| |
| /// Initializes the JniState. Called exactly once by ExternalDataSourceExecutor::Init() |
| /// process startup. |
| Status Init(MetricGroup* metrics) { |
| DCHECK(executor_class_ == NULL) << "JniState was already initialized."; |
| JniMethodDescriptor methods[] = { |
| {"<init>", |
| "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V", |
| &ctor_}, |
| {"open", "([B)[B", &open_id_}, |
| {"getNext", "([B)[B", &get_next_id_}, |
| {"close", "([B)[B", &close_id_}}; |
| |
| JNIEnv* env = JniUtil::GetJNIEnv(); |
| |
| JniLocalFrame jni_frame; |
| RETURN_IF_ERROR(jni_frame.push(env)); |
| |
| RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, |
| "org/apache/impala/extdatasource/ExternalDataSourceExecutor", |
| &executor_class_)); |
| uint32_t num_methods = sizeof(methods) / sizeof(methods[0]); |
| for (int i = 0; i < num_methods; ++i) { |
| RETURN_IF_ERROR(JniUtil::LoadJniMethod(env, executor_class_, &(methods[i]))); |
| } |
| |
| get_num_cache_hits_id_ = env->GetStaticMethodID(executor_class_, |
| "getNumClassCacheHits", "()J"); |
| RETURN_ERROR_IF_EXC(env); |
| get_num_cache_misses_id_ = env->GetStaticMethodID(executor_class_, |
| "getNumClassCacheMisses", "()J"); |
| RETURN_ERROR_IF_EXC(env); |
| |
| num_class_cache_hits_ = metrics->AddCounter( |
| "external-data-source.class-cache.hits", 0); |
| num_class_cache_misses_ = metrics->AddCounter( |
| "external-data-source.class-cache.misses", 0); |
| return Status::OK(); |
| } |
| |
| |
| /// Updates the class cache metrics via the static JNI methods on |
| /// ExternalDataSourceExecutor. |
| Status UpdateClassCacheMetrics() const { |
| DCHECK(executor_class_ != NULL) << "JniState was not initialized."; |
| JNIEnv* env = JniUtil::GetJNIEnv(); |
| int64_t num_cache_hits = env->CallStaticLongMethod(executor_class_, |
| get_num_cache_hits_id_); |
| RETURN_ERROR_IF_EXC(env); |
| num_class_cache_hits_->SetValue(num_cache_hits); |
| int64_t num_cache_misses = env->CallStaticLongMethod(executor_class_, |
| get_num_cache_misses_id_); |
| RETURN_ERROR_IF_EXC(env); |
| num_class_cache_misses_->SetValue(num_cache_misses); |
| return Status::OK(); |
| } |
| |
| /// Class reference for org.apache.impala.extdatasource.ExternalDataSourceExecutor |
| jclass executor_class_; |
| |
| jmethodID ctor_; |
| jmethodID open_id_; // ExternalDataSourceExecutor.open() |
| jmethodID get_next_id_; // ExternalDataSourceExecutor.getNext() |
| jmethodID close_id_; // ExternalDataSourceExecutor.close() |
| |
| // Static methods for getting the number of class cache hits/misses. |
| jmethodID get_num_cache_hits_id_; |
| jmethodID get_num_cache_misses_id_; |
| |
| IntCounter* num_class_cache_hits_; |
| IntCounter* num_class_cache_misses_; |
| |
| private: |
| JniState() : executor_class_(NULL) { } |
| |
| DISALLOW_COPY_AND_ASSIGN(JniState); |
| }; |
| |
| Status ExternalDataSourceExecutor::InitJNI(MetricGroup* metrics) { |
| // Initializes the JniState singleton and initializes the metric values. |
| JniState& s = JniState::GetInstance(); |
| RETURN_IF_ERROR(s.Init(metrics)); |
| RETURN_IF_ERROR(s.UpdateClassCacheMetrics()); |
| return Status::OK(); |
| } |
| |
| ExternalDataSourceExecutor::~ExternalDataSourceExecutor() { |
| DCHECK(!is_initialized_); |
| } |
| |
| Status ExternalDataSourceExecutor::Init(const string& jar_path, |
| const string& class_name, const string& api_version, const string& init_string) { |
| DCHECK(!is_initialized_); |
| LibCacheEntryHandle handle; |
| string local_jar_path; |
| // TODO(IMPALA-6727): pass the mtime from the coordinator. for now, skip the mtime |
| // check (-1). |
| RETURN_IF_ERROR(LibCache::instance()->GetLocalPath( |
| jar_path, LibCache::TYPE_JAR, -1, &handle, &local_jar_path)); |
| JNIEnv* jni_env = JniUtil::GetJNIEnv(); |
| |
| // Add a scoped cleanup jni reference object. This cleans up local refs made below. |
| JniLocalFrame jni_frame; |
| RETURN_IF_ERROR(jni_frame.push(jni_env)); |
| |
| jstring jar_path_jstr = jni_env->NewStringUTF(local_jar_path.c_str()); |
| RETURN_ERROR_IF_EXC(jni_env); |
| jstring class_name_jstr = jni_env->NewStringUTF(class_name.c_str()); |
| RETURN_ERROR_IF_EXC(jni_env); |
| jstring api_version_jstr = jni_env->NewStringUTF(api_version.c_str()); |
| RETURN_ERROR_IF_EXC(jni_env); |
| jstring init_string_jstr = jni_env->NewStringUTF(init_string.c_str()); |
| RETURN_ERROR_IF_EXC(jni_env); |
| |
| const JniState& s = JniState::GetInstance(); |
| jobject local_exec = jni_env->NewObject(s.executor_class_, s.ctor_, jar_path_jstr, |
| class_name_jstr, api_version_jstr, init_string_jstr); |
| RETURN_ERROR_IF_EXC(jni_env); |
| RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, local_exec, &executor_)); |
| RETURN_IF_ERROR(s.UpdateClassCacheMetrics()); |
| is_initialized_ = true; |
| return Status::OK(); |
| } |
| |
| // JniUtil::CallJniMethod() does not compile when the template parameters are in |
| // another namespace. The issue seems to be that SerializeThriftMsg/DeserializeThriftMsg |
| // are not being generated for these types. |
| // TODO: Understand what's happening, remove, and use JniUtil::CallJniMethod |
| template <typename T, typename R> |
| Status CallJniMethod(const jobject& obj, const jmethodID& method, const T& arg, |
| R* response) { |
| JNIEnv* jni_env = JniUtil::GetJNIEnv(); |
| jbyteArray request_bytes; |
| JniLocalFrame jni_frame; |
| RETURN_IF_ERROR(jni_frame.push(jni_env)); |
| RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes)); |
| jbyteArray result_bytes = static_cast<jbyteArray>( |
| jni_env->CallObjectMethod(obj, method, request_bytes)); |
| RETURN_ERROR_IF_EXC(jni_env); |
| RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, response)); |
| return Status::OK(); |
| } |
| |
| Status ExternalDataSourceExecutor::Open(const TOpenParams& params, TOpenResult* result) { |
| DCHECK(is_initialized_); |
| const JniState& s = JniState::GetInstance(); |
| return CallJniMethod(executor_, s.open_id_, params, result); |
| } |
| |
| Status ExternalDataSourceExecutor::GetNext(const TGetNextParams& params, |
| TGetNextResult* result) { |
| DCHECK(is_initialized_); |
| const JniState& s = JniState::GetInstance(); |
| return CallJniMethod(executor_, s.get_next_id_, params, result); |
| } |
| |
| Status ExternalDataSourceExecutor::Close(const TCloseParams& params, |
| TCloseResult* result) { |
| DCHECK(is_initialized_); |
| const JniState& s = JniState::GetInstance(); |
| Status status = CallJniMethod(executor_, s.close_id_, params, |
| result); |
| JNIEnv* env = JniUtil::GetJNIEnv(); |
| if (executor_ != NULL) env->DeleteGlobalRef(executor_); |
| is_initialized_ = false; |
| return status; |
| } |