blob: 00c1159a1403a45d76b0e4f82ee013e049e72632 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/lib-cache.h"
#include <boost/filesystem.hpp>
#include <boost/thread/locks.hpp>
#include "codegen/llvm-codegen.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/runtime-state.h"
#include "util/dynamic-util.h"
#include "util/hash-util.h"
#include "util/hdfs-util.h"
#include "util/path-builder.h"
#include "util/test-info.h"
#include "common/names.h"
namespace filesystem = boost::filesystem;
DEFINE_string(local_library_dir, "/tmp",
"Local directory to copy UDF libraries from HDFS into");
namespace impala {
scoped_ptr<LibCache> LibCache::instance_;
struct LibCacheEntry {
// Lock protecting all fields in this entry
boost::mutex lock;
// The number of users that are using this cache entry. If this is
// a .so, we can't dlclose unless the use_count goes to 0.
int use_count;
// If true, this cache entry should be removed from lib_cache_ when
// the use_count goes to 0.
bool should_remove;
// If true, we need to check if there is a newer version of the cached library in HDFS
// on next access. Should hold lock_ to read/write.
bool check_needs_refresh;
// The type of this file.
LibCache::LibType type;
// The path on the local file system for this library.
std::string local_path;
// Status returned from copying this file from HDFS.
Status copy_file_status;
// The last modification time of the HDFS file in seconds.
time_t last_mod_time;
// Handle from dlopen.
void* shared_object_handle;
// mapping from symbol => address of loaded symbol.
// Only used if the type is TYPE_SO.
typedef boost::unordered_map<std::string, void*> SymbolMap;
SymbolMap symbol_cache;
// Set of symbols in this entry. This is populated once on load and read
// only. This is only used if it is a llvm module.
// TODO: it would be nice to be able to do this for .so's as well but it's
// not trivial to walk an .so for the symbol table.
boost::unordered_set<std::string> symbols;
// Set if an error occurs loading the cache entry before the cache entry
// can be evicted. This allows other threads that attempt to use the entry
// before it is removed to return the same error.
Status loading_status;
LibCacheEntry() : use_count(0), should_remove(false), check_needs_refresh(false),
shared_object_handle(NULL) {}
~LibCacheEntry();
};
LibCache::LibCache() : current_process_handle_(NULL) {
}
LibCache::~LibCache() {
DropCache();
if (current_process_handle_ != NULL) DynamicClose(current_process_handle_);
}
Status LibCache::Init() {
DCHECK(LibCache::instance_.get() == NULL);
LibCache::instance_.reset(new LibCache());
return LibCache::instance_->InitInternal();
}
Status LibCache::InitInternal() {
if (TestInfo::is_fe_test()) {
// In the FE tests, NULL gives the handle to the java process.
// Explicitly load the fe-support shared object.
string fe_support_path;
PathBuilder::GetFullBuildPath("service/libfesupport.so", &fe_support_path);
RETURN_IF_ERROR(DynamicOpen(fe_support_path.c_str(), &current_process_handle_));
} else {
RETURN_IF_ERROR(DynamicOpen(NULL, &current_process_handle_));
}
DCHECK(current_process_handle_ != NULL)
<< "We should always be able to get current process handle.";
return Status::OK();
}
LibCacheEntry::~LibCacheEntry() {
if (shared_object_handle != NULL) {
DCHECK_EQ(use_count, 0);
DCHECK(should_remove);
DynamicClose(shared_object_handle);
}
unlink(local_path.c_str());
}
Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& symbol,
void** fn_ptr, LibCacheEntry** ent, bool quiet) {
if (hdfs_lib_file.empty()) {
// Just loading a function ptr in the current process. No need to take any locks.
DCHECK(current_process_handle_ != NULL);
RETURN_IF_ERROR(DynamicLookup(current_process_handle_, symbol.c_str(), fn_ptr, quiet));
return Status::OK();
}
LibCacheEntry* entry = NULL;
unique_lock<mutex> lock;
if (ent != NULL && *ent != NULL) {
// Reuse already-cached entry provided by user
entry = *ent;
unique_lock<mutex> l(entry->lock);
lock.swap(l);
} else {
RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, TYPE_SO, &lock, &entry));
}
DCHECK(entry != NULL);
DCHECK_EQ(entry->type, TYPE_SO);
LibCacheEntry::SymbolMap::iterator it = entry->symbol_cache.find(symbol);
if (it != entry->symbol_cache.end()) {
*fn_ptr = it->second;
} else {
RETURN_IF_ERROR(
DynamicLookup(entry->shared_object_handle, symbol.c_str(), fn_ptr, quiet));
entry->symbol_cache[symbol] = *fn_ptr;
}
DCHECK(*fn_ptr != NULL);
if (ent != NULL && *ent == NULL) {
// Only set and increment user's entry if it wasn't already cached
*ent = entry;
++(*ent)->use_count;
}
return Status::OK();
}
void LibCache::DecrementUseCount(LibCacheEntry* entry) {
if (entry == NULL) return;
bool can_delete = false;
{
unique_lock<mutex> lock(entry->lock);;
--entry->use_count;
can_delete = (entry->use_count == 0 && entry->should_remove);
}
if (can_delete) delete entry;
}
Status LibCache::GetLocalLibPath(const string& hdfs_lib_file, LibType type,
string* local_path) {
unique_lock<mutex> lock;
LibCacheEntry* entry = NULL;
RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry));
DCHECK(entry != NULL);
DCHECK_EQ(entry->type, type);
*local_path = entry->local_path;
return Status::OK();
}
Status LibCache::CheckSymbolExists(const string& hdfs_lib_file, LibType type,
const string& symbol, bool quiet) {
if (type == TYPE_SO) {
void* dummy_ptr = NULL;
return GetSoFunctionPtr(hdfs_lib_file, symbol, &dummy_ptr, NULL, quiet);
} else if (type == TYPE_IR) {
unique_lock<mutex> lock;
LibCacheEntry* entry = NULL;
RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry));
DCHECK(entry != NULL);
DCHECK_EQ(entry->type, TYPE_IR);
if (entry->symbols.find(symbol) == entry->symbols.end()) {
stringstream ss;
ss << "Symbol '" << symbol << "' does not exist in module: " << hdfs_lib_file
<< " (local path: " << entry->local_path << ")";
return quiet ? Status::Expected(ss.str()) : Status(ss.str());
}
return Status::OK();
} else if (type == TYPE_JAR) {
// TODO: figure out how to inspect contents of jars
unique_lock<mutex> lock;
LibCacheEntry* dummy_entry = NULL;
return GetCacheEntry(hdfs_lib_file, type, &lock, &dummy_entry);
} else {
DCHECK(false);
return Status("Shouldn't get here.");
}
}
void LibCache::SetNeedsRefresh(const string& hdfs_lib_file) {
unique_lock<mutex> lib_cache_lock(lock_);
LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
if (it == lib_cache_.end()) return;
LibCacheEntry* entry = it->second;
unique_lock<mutex> entry_lock(entry->lock);
// Need to hold lock_ before setting check_needs_refresh.
entry->check_needs_refresh = true;
}
void LibCache::RemoveEntry(const string& hdfs_lib_file) {
unique_lock<mutex> lib_cache_lock(lock_);
LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
if (it == lib_cache_.end()) return;
RemoveEntryInternal(hdfs_lib_file, it);
}
void LibCache::RemoveEntryInternal(const string& hdfs_lib_file,
const LibMap::iterator& entry_iter) {
LibCacheEntry* entry = entry_iter->second;
VLOG(1) << "Removing lib cache entry: " << hdfs_lib_file
<< ", local path: " << entry->local_path;
unique_lock<mutex> entry_lock(entry->lock);
// We have both locks so no other thread can be updating lib_cache_ or trying to get
// the entry.
lib_cache_.erase(entry_iter);
entry->should_remove = true;
DCHECK_GE(entry->use_count, 0);
bool can_delete = entry->use_count == 0;
// Now that the entry is removed from the map, it means no future threads
// can find it->second (the entry), so it is safe to unlock.
entry_lock.unlock();
// Now that we've unlocked, we can delete this entry if no one is using it.
if (can_delete) delete entry;
}
void LibCache::DropCache() {
unique_lock<mutex> lib_cache_lock(lock_);
for (LibMap::value_type& v: lib_cache_) {
bool can_delete = false;
{
// Lock to wait for any threads currently processing the entry.
unique_lock<mutex> entry_lock(v.second->lock);
v.second->should_remove = true;
DCHECK_GE(v.second->use_count, 0);
can_delete = v.second->use_count == 0;
}
VLOG(1) << "Removed lib cache entry: " << v.first;
if (can_delete) delete v.second;
}
lib_cache_.clear();
}
Status LibCache::GetCacheEntry(const string& hdfs_lib_file, LibType type,
unique_lock<mutex>* entry_lock, LibCacheEntry** entry) {
Status status;
{
// If an error occurs, local_entry_lock is released before calling RemoveEntry()
// below because it takes the global lock_ which must be acquired before taking entry
// locks.
unique_lock<mutex> local_entry_lock;
status = GetCacheEntryInternal(hdfs_lib_file, type, &local_entry_lock, entry);
if (status.ok()) {
entry_lock->swap(local_entry_lock);
return status;
}
if (*entry == NULL) return status;
// Set loading_status on the entry so that if another thread calls
// GetCacheEntry() for this lib before this thread is able to acquire lock_ in
// RemoveEntry(), it is able to return the same error.
(*entry)->loading_status = status;
}
// Takes lock_
RemoveEntry(hdfs_lib_file);
return status;
}
Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type,
unique_lock<mutex>* entry_lock, LibCacheEntry** entry) {
DCHECK(!hdfs_lib_file.empty());
*entry = NULL;
// Check if this file is already cached or an error occured on another thread while
// loading the library.
unique_lock<mutex> lib_cache_lock(lock_);
LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
if (it != lib_cache_.end()) {
{
unique_lock<mutex> local_entry_lock((it->second)->lock);
if (!(it->second)->loading_status.ok()) {
// If loading_status is already set, the returned *entry should be NULL.
DCHECK(*entry == NULL);
return (it->second)->loading_status;
}
}
*entry = it->second;
if ((*entry)->check_needs_refresh) {
// Check if file has been modified since loading the cached copy. If so, remove the
// cached entry and create a new one.
(*entry)->check_needs_refresh = false;
time_t last_mod_time;
hdfsFS hdfs_conn;
Status status = HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn);
if (!status.ok()) {
RemoveEntryInternal(hdfs_lib_file, it);
*entry = NULL;
return status;
}
status = GetLastModificationTime(hdfs_conn, hdfs_lib_file.c_str(), &last_mod_time);
if (!status.ok() || (*entry)->last_mod_time < last_mod_time) {
RemoveEntryInternal(hdfs_lib_file, it);
*entry = NULL;
}
RETURN_IF_ERROR(status);
}
}
if (*entry != NULL) {
// Release the lib_cache_ lock. This guarantees other threads looking at other
// libs can continue.
lib_cache_lock.unlock();
unique_lock<mutex> local_entry_lock((*entry)->lock);
entry_lock->swap(local_entry_lock);
RETURN_IF_ERROR((*entry)->copy_file_status);
DCHECK_EQ((*entry)->type, type);
DCHECK(!(*entry)->local_path.empty());
return Status::OK();
}
// Entry didn't exist. Add the entry then release lock_ (so other libraries
// can be accessed).
*entry = new LibCacheEntry();
// Grab the entry lock before adding it to lib_cache_. We still need to do more
// work to initialize *entry and we don't want another thread to pick up
// the uninitialized entry.
unique_lock<mutex> local_entry_lock((*entry)->lock);
entry_lock->swap(local_entry_lock);
lib_cache_[hdfs_lib_file] = *entry;
lib_cache_lock.unlock();
// At this point we have the entry lock but not the lib cache lock.
DCHECK(*entry != NULL);
(*entry)->type = type;
// Copy the file
(*entry)->local_path = MakeLocalPath(hdfs_lib_file, FLAGS_local_library_dir);
VLOG(1) << "Adding lib cache entry: " << hdfs_lib_file
<< ", local path: " << (*entry)->local_path;
hdfsFS hdfs_conn, local_conn;
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn));
RETURN_IF_ERROR(HdfsFsCache::instance()->GetLocalConnection(&local_conn));
// Note: the file can be updated between getting last_mod_time and copying the file to
// local_path. This can only result in the file unnecessarily being refreshed, and does
// not affect correctness.
(*entry)->copy_file_status = GetLastModificationTime(
hdfs_conn, hdfs_lib_file.c_str(), &(*entry)->last_mod_time);
RETURN_IF_ERROR((*entry)->copy_file_status);
(*entry)->copy_file_status = CopyHdfsFile(
hdfs_conn, hdfs_lib_file, local_conn, (*entry)->local_path);
RETURN_IF_ERROR((*entry)->copy_file_status);
if (type == TYPE_SO) {
// dlopen the local library
RETURN_IF_ERROR(
DynamicOpen((*entry)->local_path.c_str(), &(*entry)->shared_object_handle));
} else if (type == TYPE_IR) {
// Load the module temporarily and populate all symbols.
ObjectPool pool;
scoped_ptr<LlvmCodeGen> codegen;
string module_id = filesystem::path((*entry)->local_path).stem().string();
RETURN_IF_ERROR(LlvmCodeGen::CreateFromFile(
&pool, NULL, (*entry)->local_path, module_id, &codegen));
codegen->GetSymbols(&(*entry)->symbols);
} else {
DCHECK_EQ(type, TYPE_JAR);
// Nothing to do.
}
return Status::OK();
}
string LibCache::MakeLocalPath(const string& hdfs_path, const string& local_dir) {
// Append the pid and library number to the local directory.
filesystem::path src(hdfs_path);
stringstream dst;
dst << local_dir << "/" << src.stem().native() << "." << getpid() << "."
<< (num_libs_copied_.Add(1) - 1) << src.extension().native();
return dst.str();
}
}