blob: 21857980f3a4cd285dd1f68d4850d8437dcc8ebc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <signal.h>
#ifndef __CYGWIN__
#include <execinfo.h>
#endif
#include "lib/commons.h"
#include "NativeTask.h"
#include "lib/NativeObjectFactory.h"
#include "lib/NativeLibrary.h"
#include "lib/BufferStream.h"
#include "util/StringUtil.h"
#include "util/SyncUtils.h"
#include "util/WritableUtils.h"
#include "handler/BatchHandler.h"
#include "handler/MCollectorOutputHandler.h"
#include "handler/CombineHandler.h"
using namespace NativeTask;
// TODO: just for debug, should be removed
extern "C" void handler(int sig) {
void *array[10];
size_t size;
// print out all the frames to stderr
fprintf(stderr, "Error: signal %d:\n", sig);
#ifndef __CYGWIN__
// get void*'s for all entries on the stack
size = backtrace(array, 10);
backtrace_symbols_fd(array, size, 2);
#endif
exit(1);
}
DEFINE_NATIVE_LIBRARY(NativeTask) {
REGISTER_CLASS(BatchHandler, NativeTask);
REGISTER_CLASS(CombineHandler, NativeTask);
REGISTER_CLASS(MCollectorOutputHandler, NativeTask);
NativeObjectFactory::SetDefaultClass(BatchHandlerType, "NativeTask.BatchHandler");
}
namespace NativeTask {
static Config G_CONFIG;
vector<NativeLibrary *> NativeObjectFactory::Libraries;
map<NativeObjectType, string> NativeObjectFactory::DefaultClasses;
Config * NativeObjectFactory::GlobalConfig = &G_CONFIG;
float NativeObjectFactory::LastProgress = 0;
Progress * NativeObjectFactory::TaskProgress = NULL;
string NativeObjectFactory::LastStatus;
set<Counter *, CounterPtrCompare> NativeObjectFactory::CounterSet;
vector<Counter *> NativeObjectFactory::Counters;
vector<uint64_t> NativeObjectFactory::CounterLastUpdateValues;
bool NativeObjectFactory::Inited = false;
static Lock FactoryLock;
bool NativeObjectFactory::Init() {
ScopeLock<Lock> autolocak(FactoryLock);
if (Inited == false) {
// setup log device
string device = GetConfig().get(NATIVE_LOG_DEVICE, "stderr");
if (device == "stdout") {
LOG_DEVICE = stdout;
} else if (device == "stderr") {
LOG_DEVICE = stderr;
} else {
LOG_DEVICE = fopen(device.c_str(), "w");
}
NativeTaskInit();
NativeLibrary * library = new NativeLibrary("libnativetask.so", "NativeTask");
library->_getObjectCreatorFunc = NativeTaskGetObjectCreator;
Libraries.push_back(library);
Inited = true;
// load extra user provided libraries
string libraryConf = GetConfig().get(NATIVE_CLASS_LIBRARY_BUILDIN, "");
if (libraryConf.length() > 0) {
vector<string> libraries;
vector<string> pair;
StringUtil::Split(libraryConf, ",", libraries, true);
for (size_t i = 0; i < libraries.size(); i++) {
pair.clear();
StringUtil::Split(libraries[i], "=", pair, true);
if (pair.size() == 2) {
string & name = pair[0];
string & path = pair[1];
LOG("[NativeObjectLibrary] Try to load library [%s] with file [%s]", name.c_str(),
path.c_str());
if (false == RegisterLibrary(path, name)) {
LOG("[NativeObjectLibrary] RegisterLibrary failed: name=%s path=%s", name.c_str(),
path.c_str());
return false;
} else {
LOG("[NativeObjectLibrary] RegisterLibrary success: name=%s path=%s", name.c_str(),
path.c_str());
}
} else {
LOG("[NativeObjectLibrary] Illegal native.class.libray: [%s] in [%s]",
libraries[i].c_str(), libraryConf.c_str());
}
}
}
const char * version = GetConfig().get(NATIVE_HADOOP_VERSION);
LOG("[NativeObjectLibrary] NativeTask library initialized with hadoop %s",
version == NULL ? "unkown" : version);
}
return true;
}
void NativeObjectFactory::Release() {
ScopeLock<Lock> autolocak(FactoryLock);
for (ssize_t i = Libraries.size() - 1; i >= 0; i--) {
delete Libraries[i];
Libraries[i] = NULL;
}
Libraries.clear();
for (size_t i = 0; i < Counters.size(); i++) {
delete Counters[i];
}
Counters.clear();
if (LOG_DEVICE != stdout && LOG_DEVICE != stderr) {
fclose(LOG_DEVICE);
LOG_DEVICE = stderr;
}
Inited = false;
}
void NativeObjectFactory::CheckInit() {
if (Inited == false) {
if (!Init()) {
throw new IOException("Init NativeTask library failed.");
}
}
}
Config & NativeObjectFactory::GetConfig() {
return *GlobalConfig;
}
Config * NativeObjectFactory::GetConfigPtr() {
return GlobalConfig;
}
void NativeObjectFactory::SetTaskProgressSource(Progress * progress) {
TaskProgress = progress;
}
float NativeObjectFactory::GetTaskProgress() {
if (TaskProgress != NULL) {
LastProgress = TaskProgress->getProgress();
}
return LastProgress;
}
void NativeObjectFactory::SetTaskStatus(const string & status) {
LastStatus = status;
}
static Lock CountersLock;
void NativeObjectFactory::GetTaskStatusUpdate(string & statusData) {
// Encoding:
// progress:float
// status:Text
// Counter number
// Counters[group:Text, name:Text, incrCount:Long]
OutputStringStream os(statusData);
float progress = GetTaskProgress();
WritableUtils::WriteFloat(&os, progress);
WritableUtils::WriteText(&os, LastStatus);
LastStatus.clear();
{
ScopeLock<Lock> AutoLock(CountersLock);
uint32_t numCounter = (uint32_t)Counters.size();
WritableUtils::WriteInt(&os, numCounter);
for (size_t i = 0; i < numCounter; i++) {
Counter * counter = Counters[i];
uint64_t newCount = counter->get();
uint64_t incr = newCount - CounterLastUpdateValues[i];
CounterLastUpdateValues[i] = newCount;
WritableUtils::WriteText(&os, counter->group());
WritableUtils::WriteText(&os, counter->name());
WritableUtils::WriteLong(&os, incr);
}
}
}
Counter * NativeObjectFactory::GetCounter(const string & group, const string & name) {
ScopeLock<Lock> AutoLock(CountersLock);
Counter tmpCounter(group, name);
set<Counter *>::iterator itr = CounterSet.find(&tmpCounter);
if (itr != CounterSet.end()) {
return *itr;
}
Counter * ret = new Counter(group, name);
Counters.push_back(ret);
CounterLastUpdateValues.push_back(0);
CounterSet.insert(ret);
return ret;
}
void NativeObjectFactory::RegisterClass(const string & clz, ObjectCreatorFunc func) {
NativeTaskClassMap__[clz] = func;
}
NativeObject * NativeObjectFactory::CreateObject(const string & clz) {
ObjectCreatorFunc creator = GetObjectCreator(clz);
return creator ? creator() : NULL;
}
void * NativeObjectFactory::GetFunction(const string & funcName) {
CheckInit();
{
for (vector<NativeLibrary*>::reverse_iterator ritr = Libraries.rbegin();
ritr != Libraries.rend(); ritr++) {
void * ret = (*ritr)->getFunction(funcName);
if (NULL != ret) {
return ret;
}
}
return NULL;
}
}
ObjectCreatorFunc NativeObjectFactory::GetObjectCreator(const string & clz) {
CheckInit();
{
for (vector<NativeLibrary*>::reverse_iterator ritr = Libraries.rbegin();
ritr != Libraries.rend(); ritr++) {
ObjectCreatorFunc ret = (*ritr)->getObjectCreator(clz);
if (NULL != ret) {
return ret;
}
}
return NULL;
}
}
void NativeObjectFactory::ReleaseObject(NativeObject * obj) {
delete obj;
}
bool NativeObjectFactory::RegisterLibrary(const string & path, const string & name) {
CheckInit();
{
NativeLibrary * library = new NativeLibrary(path, name);
bool ret = library->init();
if (!ret) {
delete library;
return false;
}
Libraries.push_back(library);
return true;
}
}
static Lock DefaultClassesLock;
void NativeObjectFactory::SetDefaultClass(NativeObjectType type, const string & clz) {
ScopeLock<Lock> autolocak(DefaultClassesLock);
DefaultClasses[type] = clz;
}
NativeObject * NativeObjectFactory::CreateDefaultObject(NativeObjectType type) {
CheckInit();
{
if (DefaultClasses.find(type) != DefaultClasses.end()) {
string clz = DefaultClasses[type];
return CreateObject(clz);
}
LOG("[NativeObjectLibrary] Default class for NativeObjectType %s not found",
NativeObjectTypeToString(type).c_str());
return NULL;
}
}
int NativeObjectFactory::BytesComparator(const char * src, uint32_t srcLength, const char * dest,
uint32_t destLength) {
uint32_t minlen = std::min(srcLength, destLength);
int64_t ret = fmemcmp(src, dest, minlen);
if (ret > 0) {
return 1;
} else if (ret < 0) {
return -1;
}
return srcLength - destLength;
}
int NativeObjectFactory::ByteComparator(const char * src, uint32_t srcLength, const char * dest,
uint32_t destLength) {
return (*src) - (*dest);
}
int NativeObjectFactory::IntComparator(const char * src, uint32_t srcLength, const char * dest,
uint32_t destLength) {
int result = (*src) - (*dest);
if (result == 0) {
uint32_t from = bswap(*(uint32_t*)src);
uint32_t to = bswap(*(uint32_t*)dest);
if (from > to) {
return 1;
} else if (from == to) {
return 0;
} else {
return -1;
}
}
return result;
}
int NativeObjectFactory::LongComparator(const char * src, uint32_t srcLength, const char * dest,
uint32_t destLength) {
int result = (int)(*src) - (int)(*dest);
if (result == 0) {
uint64_t from = bswap64(*(uint64_t*)src);
uint64_t to = bswap64(*(uint64_t*)dest);
if (from > to) {
return 1;
} else if (from == to) {
return 0;
} else {
return -1;
}
}
return result;
}
int NativeObjectFactory::VIntComparator(const char * src, uint32_t srcLength, const char * dest,
uint32_t destLength) {
int32_t from = WritableUtils::ReadVInt(src, srcLength);
int32_t to = WritableUtils::ReadVInt(dest, destLength);
if (from > to) {
return 1;
} else if (from == to) {
return 0;
} else {
return -1;
}
}
int NativeObjectFactory::VLongComparator(const char * src, uint32_t srcLength, const char * dest,
uint32_t destLength) {
int64_t from = WritableUtils::ReadVLong(src, srcLength);
int64_t to = WritableUtils::ReadVLong(dest, destLength);
if (from > to) {
return 1;
} else if (from == to) {
return 0;
} else {
return -1;
}
}
int NativeObjectFactory::FloatComparator(const char * src, uint32_t srcLength, const char * dest,
uint32_t destLength) {
if (srcLength != 4 || destLength != 4) {
THROW_EXCEPTION_EX(IOException, "float comparator, while src/dest lengt is not 4");
}
uint32_t from = bswap(*(uint32_t*)src);
uint32_t to = bswap(*(uint32_t*)dest);
float * srcValue = (float *)(&from);
float * destValue = (float *)(&to);
if ((*srcValue) < (*destValue)) {
return -1;
} else if ((*srcValue) == (*destValue)) {
return 0;
} else {
return 1;
}
}
int NativeObjectFactory::DoubleComparator(const char * src, uint32_t srcLength, const char * dest,
uint32_t destLength) {
if (srcLength != 8 || destLength != 8) {
THROW_EXCEPTION_EX(IOException, "double comparator, while src/dest lengt is not 4");
}
uint64_t from = bswap64(*(uint64_t*)src);
uint64_t to = bswap64(*(uint64_t*)dest);
double * srcValue = (double *)(&from);
double * destValue = (double *)(&to);
if ((*srcValue) < (*destValue)) {
return -1;
} else if ((*srcValue) == (*destValue)) {
return 0;
} else {
return 1;
}
}
ComparatorPtr get_comparator(const KeyValueType keyType, const char * comparatorName) {
if (NULL == comparatorName) {
if (keyType == BytesType || keyType == TextType) {
return &NativeObjectFactory::BytesComparator;
} else if (keyType == ByteType || keyType == BoolType) {
return &NativeObjectFactory::ByteComparator;
} else if (keyType == IntType) {
return &NativeObjectFactory::IntComparator;
} else if (keyType == LongType) {
return &NativeObjectFactory::LongComparator;
} else if (keyType == FloatType) {
return &NativeObjectFactory::FloatComparator;
} else if (keyType == DoubleType) {
return &NativeObjectFactory::DoubleComparator;
} else if (keyType == VIntType) {
return &NativeObjectFactory::VIntComparator;
} else if (keyType == VLongType) {
return &NativeObjectFactory::VLongComparator;
}
} else {
void * func = NativeObjectFactory::GetFunction(string(comparatorName));
return (ComparatorPtr)func;
}
return NULL;
}
} // namespace NativeTask