blob: a70433f8bf63f75e046d36b227a8693034587df0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* This file contains code common to the ActiveMQ and WebSphere MQ
* implementations of the UIMA C++ service wrapper.
*/
#ifndef __DEPLOY_SERVICE__
#define __DEPLOY_SERVICE__
#include "uima/api.hpp"
#include <apr_network_io.h>
#include <apr_proc_mutex.h>
#include <apr_errno.h>
#include <apr_general.h>
#include <apr_thread_proc.h>
#include <apr_thread_cond.h>
#include <apr_signal.h>
class SocketLogger;
class Monitor;
class UimacppService;
class ServiceParameters;
/*
* Constants
*/
#define PROCESS_CAS_COMMAND 2000
#define GET_META_COMMAND 2001
#define CPC_COMMAND 2002
#define REQUEST 3000
#define RESPONSE 3001
#define XMI_PAYLOAD 1000
#define XCAS_PAYLOAD 1004
#define META_PAYLOAD 1002
#define EXC_PAYLOAD 1003
#define NO_PAYLOAD 1005
static char * getmeta_selector = "Command = 2001";
static char * annotator_selector = "Command = 2000 or Command = 2002";
static int initialize(ServiceParameters &, apr_pool_t*);
static void* APR_THREAD_FUNC handleCommands(apr_thread_t *thd, void *data);
static int terminateService();
static void signal_handler(int signum);
/**
* This class holds command line parameters that
* configure this service.
*/
class ServiceParameters {
public:
//Constructor
ServiceParameters() :
iv_aeDescriptor(), iv_brokerURL("tcp://localhost:61616"),
iv_queueName(), iv_numInstances(1),
iv_prefetchSize(1), iv_javaport(0),
iv_datapath(), iv_loglevel(0), iv_tracelevel(-1),
iv_errThreshhold(0), iv_errWindow(0), iv_terminateOnCPCError(false),
iv_mqHost("localhost"), iv_mqPort(1414), iv_mqChannel(), iv_mqQueueMgr(),
iv_user(), iv_password(), iv_initialfsheapsize(0),
iv_wpmEndpoints("localhost:7276:BootstrapBasicMessaging"),
iv_wpmBusname() {}
~ServiceParameters() {
}
void print() {
cout << asString() << endl;
}
string ServiceParameters::asString() {
stringstream str;
str << "AE descriptor " << iv_aeDescriptor
<< " Initial FSHeap size " << this->iv_initialfsheapsize
<< " Input Queue " << iv_queueName
<< " Num Instances " << iv_numInstances
<< " Prefetch size " << iv_prefetchSize
<< " ActiveMQ broker URL " << iv_brokerURL
<< " MQ host " << iv_mqHost
<< " MQ port " << iv_mqPort
<< " MQ qmgr " << iv_mqQueueMgr
<< " MQ channel " << iv_mqChannel
<< " WAS bootstrap service address " << iv_wpmEndpoints
<< " WAS SIB name " << iv_wpmBusname
<< " Java port " << iv_javaport
<< " logging level " << iv_loglevel
<< " trace level " << iv_tracelevel
<< " datapath " << iv_datapath << endl;
return str.str();
}
const string & getBrokerURL() const {
return iv_brokerURL;
}
const string & getQueueName() const {
return iv_queueName;
}
const string & getAEDescriptor() const {
return iv_aeDescriptor;
}
const int getTraceLevel() const {
return iv_tracelevel;
}
const int getLoggingLevel() const {
return iv_loglevel;
}
const int getNumberOfInstances() const {
return iv_numInstances;
}
const int getPrefetchSize() const {
return iv_prefetchSize;
}
const int getJavaPort() const {
return iv_javaport;
}
const string & getDataPath() {
return iv_datapath;
}
const int getErrorThreshhold() {
return iv_errThreshhold;
}
const int getErrorWindow() {
return iv_errWindow;
}
const bool terminatOnCPCError() {
return iv_terminateOnCPCError;
}
const string & getMQHost() {
return iv_mqHost;
}
const int getMQPort() {
return iv_mqPort;
}
const string & getMQChannel() {
return iv_mqChannel;
}
const string & getMQQueueMgr() {
return iv_mqQueueMgr;
}
const string & getUserName() {
return iv_user;
}
const string & getPassword() {
return iv_password;
}
const string & getWPMEndpoints() {
return iv_wpmEndpoints;
}
const string & getWPMBusname() {
return iv_wpmBusname;
}
const size_t getInitialFSHeapSize() {
return iv_initialfsheapsize;
}
void setBrokerURL(const string url) {
iv_brokerURL = url;
}
void setQueueName(const string qname) {
iv_queueName = qname;
}
void setAEDescriptor(const string loc) {
iv_aeDescriptor = loc;
}
void setNumberOfInstances(int num) {
iv_numInstances=num;
}
void setTraceLevel(int num) {
iv_tracelevel=num;
}
void setLoggingLevel(int num) {
iv_loglevel=num;
}
void setJavaPort(int val) {
iv_javaport = val;
}
void setDataPath(const string path) {
iv_datapath=path;
}
void parseArgs(int argc, char* argv[]) {
int index =0;
while (++index < argc) {
char * arg = argv[index];
if ( 0 == strcmp(arg, "-b") ) {
if (++index < argc) {
this->iv_brokerURL = argv[index];
}
} else if (0 == strcmp(arg, "-n")) {
if (++index < argc) {
this->iv_numInstances = atoi(argv[index]);
}
} else if (0 == strcmp(arg, "-fsheapsz")) {
if (++index < argc) {
this->iv_initialfsheapsize = atoi(argv[index]);
this->iv_initialfsheapsize = this->iv_initialfsheapsize/4;
}
} else if (0 == strcmp(arg, "-p")) {
if (++index < argc) {
this->iv_prefetchSize = atoi(argv[index]);
}
} else if (0 == strcmp(arg, "-t")) {
if (++index < argc) {
this->iv_tracelevel = atoi(argv[index]);
}
} else if (0 == strcmp(arg, "-l")) {
if (++index < argc) {
this->iv_loglevel = atoi(argv[index]);
}
} else if (0 == strcmp(arg, "-jport")) {
if (++index < argc) {
this->iv_javaport = atoi(argv[index]);
}
} else if (0 == strcmp(arg, "-d")) {
if (++index < argc) {
this->iv_datapath = argv[index];
}
} else if (0 == strcmp(arg, "-e")) {
if (++index < argc) {
this->iv_errThreshhold = atoi(argv[index]);
}
} else if (0 == strcmp(arg, "-w")) {
if (++index < argc) {
this->iv_errWindow = atoi(argv[index]);
}
} else if (0 == strcmp(arg, "-mqh")) {
if (++index < argc) {
this->iv_mqHost = argv[index];
}
} else if (0 == strcmp(arg, "-mqp")) {
if (++index < argc) {
this->iv_mqPort = atoi(argv[index]);
}
} else if (0 == strcmp(arg, "-mqc")) {
if (++index < argc) {
this->iv_mqChannel = argv[index];
}
} else if (0 == strcmp(arg, "-mqm")) {
if (++index < argc) {
this->iv_mqQueueMgr = argv[index];
}
} else if (0 == strcmp(arg, "-user")) {
if (++index < argc) {
this->iv_user = argv[index];
}
} else if (0 == strcmp(arg, "-pw")) {
if (++index < argc) {
this->iv_password = argv[index];
}
} else if (0 == strcmp(arg, "-wash")) {
if (++index < argc) {
this->iv_wpmEndpoints = argv[index];
}
} else if (0 == strcmp(arg, "-wasb")) {
if (++index < argc) {
this->iv_wpmBusname = argv[index];
}
} else if (0 == strcmp(arg, "-a")) {
if (++index < argc) {
if (stricmp(argv[index],"true")==0) {
this->iv_terminateOnCPCError = true;
} else {
this->iv_terminateOnCPCError=false;
}
}
} else {
if (this->iv_aeDescriptor.length() == 0) {
this->iv_aeDescriptor = argv[index];
} else {
if (this->iv_queueName.length() == 0) {
this->iv_queueName = argv[index];
} else {
cerr << "unexpected argument " << argv[index] << endl;
}
}
}
}
//This is a temporary fix to enable the service wrapper to
//connect to the WAS messaging engine.
if (this->iv_mqHost.find_first_of(":") != string::npos) {
this->iv_wpmEndpoints = iv_mqHost;
this->iv_mqHost = "";
this->iv_wpmBusname = this->iv_mqQueueMgr;
this->iv_mqQueueMgr = "";
}
}
string getServiceName() {
if (getWPMBusname().length() == 0) {
stringstream str;
str << getMQPort();
return getQueueName() + "@" + getMQHost() + ":" + str.str();
} else {
return getQueueName() + "@" + getWPMBusname();
}
}
private:
string iv_aeDescriptor;
//ActiveMQ
string iv_brokerURL;
string iv_queueName;
int iv_numInstances;
int iv_prefetchSize;
int iv_javaport;
string iv_datapath;
int iv_loglevel;
int iv_tracelevel;
int iv_errThreshhold;
int iv_errWindow;
bool iv_terminateOnCPCError;
//Websphere MQ
string iv_mqHost;
int iv_mqPort;
string iv_mqChannel;
string iv_mqQueueMgr;
string iv_user;
string iv_password;
size_t iv_initialfsheapsize;
//Websphere default messaging
string iv_wpmEndpoints;
string iv_wpmBusname;
};
/**
* Command line parameters required to start this service.
*/
class Monitor {
public:
//Constructor
Monitor (apr_pool_t * pool, string brokerURL,
string queueName, string aeDesc,
int numInstances, int prefetchSize, int processCasErrorThreshhold,
int processCasErrorWindow, bool terminateOnCPCError)
{
iv_brokerURL = brokerURL;
iv_queueName = queueName;
iv_aeDescriptor = aeDesc;
iv_numInstances = numInstances;
iv_prefetchSize = 0;
iv_cpcErrors = 0;
iv_getmetaErrors = 0;
iv_processcasErrors = 0;
iv_deserTime = 0;
iv_serTime = 0;
iv_annotatorTime = 0;
iv_startTime = apr_time_now();
iv_numCasProcessed = 0;
iv_errorThreshhold = processCasErrorThreshhold;
iv_errorWindow = processCasErrorWindow;
iv_terminateOnCPCError = terminateOnCPCError;
iv_getmetaId = -1;
iv_numRunning = 0;
mutex = 0;
lmutex = 0;
cond = 0;
cond_mutex = 0;
apr_status_t rv = apr_thread_mutex_create(&mutex,APR_THREAD_MUTEX_UNNESTED, pool);
apr_thread_mutex_create(&cond_mutex,APR_THREAD_MUTEX_UNNESTED, pool);
apr_thread_mutex_create(&lmutex,APR_THREAD_MUTEX_UNNESTED, pool);
apr_thread_cond_create(&cond, pool);
}
void print();
void shutdown() {
//apr_thread_mutex_unlock(mutex);
// apr_thread_mutex_unlock(lmutex);
apr_thread_cond_signal(this->cond);
return ;
}
const string & getBrokerURL() const {
return iv_brokerURL;
}
const string & getQueueName() const {
return iv_queueName;
}
const string & getAEDescriptor() const {
return iv_aeDescriptor;
}
const int getNumberOfInstances() const {
return iv_numInstances;
}
void setBrokerURL(const string url) {
iv_brokerURL = url;
}
void setQueueName(const string qname) {
iv_queueName = qname;
}
void setAEDescriptor(const string loc) {
iv_aeDescriptor = loc;
}
void setNumberOfInstances(int num) {
iv_numInstances=num;
iv_numRunning = num;
}
void listenerStopped(int id) {
cout << "listener stopped " << id << endl;
if (id == iv_getmetaId || iv_numRunning == 1) {
shutdown();
} else {
apr_thread_mutex_lock(mutex);
iv_numRunning--;
apr_thread_mutex_unlock(mutex);
}
}
void setGetMetaListenerId(int id) {
iv_getmetaId = id;
}
void setStartTime() {
apr_thread_mutex_lock(mutex);
iv_startTime = apr_time_now();
apr_thread_mutex_unlock(mutex);
}
void logMessage(string message) {
apr_thread_mutex_lock(lmutex);
uima::ResourceManager::getInstance().getLogger().logMessage(message);
apr_thread_mutex_unlock(lmutex);
}
void logWarning(string message) {
apr_thread_mutex_lock(lmutex);
uima::ResourceManager::getInstance().getLogger().logWarning(message);
apr_thread_mutex_unlock(lmutex);
}
void logError(string message) {
apr_thread_mutex_lock(lmutex);
uima::ResourceManager::getInstance().getLogger().logError(message,-99);
apr_thread_mutex_unlock(lmutex);
}
void processingComplete(int command, bool success, apr_time_t totalTime,
apr_time_t deserTime=0,
apr_time_t analyticTime=0,
apr_time_t serTime=0) {
apr_thread_mutex_lock(mutex);
iv_messageProcessTime += totalTime;
if (command == PROCESS_CAS_COMMAND) {
iv_numCasProcessed++;
iv_deserTime += deserTime;
iv_annotatorTime += analyticTime;
iv_serTime += serTime;
} else if (command == GET_META_COMMAND) {
iv_getmetaTime += totalTime;
} else if (command == CPC_COMMAND) {
iv_cpcTime += totalTime;
}
if (!success) {
incrementErrorCount(command);
}
apr_thread_mutex_unlock(mutex);
}
void writeStatistics(apr_socket_t * cs) {
apr_thread_mutex_lock(mutex);
stringstream str;
apr_time_t totalProcessTime = (apr_time_now() - iv_startTime)*iv_numInstances;
// idle time in millis
long idleTime = (totalProcessTime-iv_messageProcessTime)/1000;
str << "NUMINSTANCES=" << iv_numInstances
<< " CPCERRORS=" << iv_cpcErrors
<< " GETMETAERRORS=" << iv_getmetaErrors
<< " PROCESSCASERRORS=" << iv_processcasErrors
<< " CPCTIME=" << iv_cpcTime/1000
<< " GETMETATIME=" << iv_getmetaTime/1000
<< " NUMCASPROCESSED=" << iv_numCasProcessed
<< " SERIALIZETIME=" << iv_serTime/1000
<< " DESERIALIZETIME=" << iv_deserTime/1000
<< " ANNOTATORTIME=" << iv_annotatorTime/1000
<< " MESSAGEPROCESSTIME=" << iv_messageProcessTime/1000
<< " IDLETIME=" << idleTime << endl;
apr_size_t len = str.str().length();
apr_status_t rv = apr_socket_send(cs, str.str().c_str(), &len);
len = 1;
apr_socket_send(cs,"\n", &len);
if (rv != APR_SUCCESS) {
//TODO throw exception
cout << "apr_socket_send() failed " << endl;
}
apr_thread_mutex_unlock(mutex);
}
void reset() {
apr_thread_mutex_lock(mutex);
// idle time in millis
this->iv_numCasProcessed=0;
this->iv_annotatorTime=0;
this->iv_deserTime=0;
this->iv_serTime=0;
this->iv_cpcTime=0;
this->iv_getmetaTime=0;
this->iv_messageProcessTime=0;
this->iv_startTime=apr_time_now();
this->iv_cpcErrors=0;
this->iv_getmetaErrors=0;
this->iv_processcasErrors=0;
apr_thread_mutex_unlock(mutex);
}
void printStatistics();
public:
apr_thread_mutex_t *cond_mutex;
apr_thread_cond_t *cond;
private:
apr_thread_mutex_t *mutex;
apr_thread_mutex_t *lmutex;
string iv_brokerURL;
string iv_queueName;
string iv_aeDescriptor;
int iv_numInstances;
int iv_prefetchSize;
long iv_cpcErrors;
long iv_getmetaErrors;
long iv_processcasErrors;
int iv_getmetaId;
int iv_numRunning;
apr_time_t iv_deserTime;
apr_time_t iv_serTime;
apr_time_t iv_annotatorTime;
apr_time_t iv_messageProcessTime;
apr_time_t iv_cpcTime;
apr_time_t iv_getmetaTime;
apr_time_t iv_startTime;
long iv_numCasProcessed;
int iv_errorThreshhold;
int iv_errorWindow;
bool iv_terminateOnCPCError;
void incrementErrorCount(int command) {
if (command == PROCESS_CAS_COMMAND) {
iv_processcasErrors++;
if (this->iv_errorThreshhold > 0 ) {
if ( this->iv_errorWindow == 0 && iv_processcasErrors >=
this->iv_errorThreshhold) {
cerr << " number of PROCESSCAS errors exceeded the threshhold. Terminating the service." << endl;
shutdown();
} else {
//TODO sliding window
}
}
} else if (command == GET_META_COMMAND) {
iv_getmetaErrors++;
cerr << "getMeta Error terminating the service." << endl;
shutdown();
} else if (command == CPC_COMMAND) {
iv_cpcErrors++;
if (this->iv_terminateOnCPCError) {
cerr << "CPC Error terminating the service." << endl;
shutdown();
}
}
}
};
/* SocketLogger */
class SocketLogger : public uima::Logger {
private:
apr_socket_t * socket;
apr_thread_mutex_t *mutex;
public:
SocketLogger(apr_socket_t * sock, apr_pool_t * pool) : socket(sock) {
apr_thread_mutex_create(&mutex,APR_THREAD_MUTEX_UNNESTED, pool);
}
virtual void log(uima::LogStream::EnEntryType entrytype,
string classname,
string methodname,
string message,
long lUserCode) {
//cout << "SocketLogger::log() " << message << endl;
apr_thread_mutex_lock(mutex);
apr_status_t rv;
stringstream str;
str << entrytype << " " << classname << " " << methodname;
if (entrytype == uima::LogStream::EnMessage) {
if (lUserCode != 0) {
str << " RC=" << lUserCode;
}
} else {
str << " RC=" << lUserCode;
}
str << " " << message << endl;
apr_size_t len = str.str().length();
rv = apr_socket_send(socket, str.str().c_str(), &len);
if (rv != APR_SUCCESS) {
//TODO throw exception
cerr << "apr_socket_send() failed " << endl;
}
apr_thread_mutex_unlock(mutex);
}
};
static SocketLogger * singleton_pLogger =0;
static Monitor * singleton_pMonitor=0;
//static ServiceParameters serviceDesc;
//static apr_pool_t *pool=0;
static apr_socket_t *s=0; //logger socket
static apr_socket_t *cs=0; //receive commands socket
static apr_sockaddr_t *sa=0;
static apr_status_t rv=0;
///static apr_thread_t *thread=0;
///static apr_threadattr_t *thd_attr=0;
static void signal_handler(int signum) {
stringstream str;
str << __FILE__ << __LINE__ << " Received Signal: " << signum;
cerr << str.str() << endl;
singleton_pMonitor->shutdown();
}
static int terminateService() {
cout << "deployCppService::terminateService" << endl;
if (cs) {
apr_socket_close(cs);
cs=0;
}
if (singleton_pMonitor) {
delete singleton_pMonitor;
singleton_pMonitor=0;
}
if (singleton_pLogger) {
uima::ResourceManager::getInstance().unregisterLogger(singleton_pLogger);
delete singleton_pLogger;
singleton_pLogger =0;
}
if (s) {
apr_socket_close(s);
s=0;
}
return 0;
}
static int initialize(ServiceParameters & serviceDesc, apr_pool_t * pool) {
if (serviceDesc.getLoggingLevel() == 0) {
uima::ResourceManager::getInstance().setLoggingLevel(uima::LogStream::EnMessage);
} else if (serviceDesc.getLoggingLevel() == 1) {
uima::ResourceManager::getInstance().setLoggingLevel(uima::LogStream::EnWarning);
} else if (serviceDesc.getLoggingLevel() == 2) {
uima::ResourceManager::getInstance().setLoggingLevel(uima::LogStream::EnError);
}
/*use only first path that exists */
if (serviceDesc.getDataPath().length() > 0) {
uima::util::Filename dataPath("");
dataPath.determinePath(serviceDesc.getDataPath().c_str());
uima::util::Location dataLocation(dataPath.getAsCString());
uima::ResourceManager::getInstance().setNewLocationData(dataLocation);
}
//register signal handler
apr_signal(SIGINT, signal_handler);
apr_signal(SIGTERM, signal_handler);
/* create object to collect JMX stats */
singleton_pMonitor = new Monitor(pool,
serviceDesc.getBrokerURL(),serviceDesc.getQueueName(),
serviceDesc.getAEDescriptor(), serviceDesc.getNumberOfInstances(),
serviceDesc.getPrefetchSize(),
serviceDesc.getErrorThreshhold(),
serviceDesc.getErrorWindow(),
serviceDesc.terminatOnCPCError());
/* set up connection to Java controller bean if a port is specified */
int javaport=serviceDesc.getJavaPort();
if (javaport > 0) {
//cout << "connecting to java controller port " << javaport << endl;
rv = apr_sockaddr_info_get(&sa, "localhost", APR_INET, javaport, 0, pool);
if (rv != APR_SUCCESS) {
cerr << "ERROR: apr_sockaddr_info_get localhost at port " << javaport << endl;
return -2;
}
rv = apr_socket_create(&s, sa->family, SOCK_STREAM, APR_PROTO_TCP, pool);
if (rv != APR_SUCCESS) {
cerr << "ERROR: apr_socket_create() logger connection at port " << javaport << endl;
return -3;
}
rv = apr_socket_connect(s, sa);
if (rv != APR_SUCCESS) {
cerr << "ERROR: apr_socket_connect() logger connection at port " << javaport << endl;
return -4;
}
//commands connection
rv = apr_socket_create(&cs, sa->family, SOCK_STREAM, APR_PROTO_TCP, pool);
if (rv != APR_SUCCESS) {
cerr << "ERROR: apr_socket_create() commands connection at port " << javaport << endl;
return -5;
}
rv = apr_socket_connect(cs, sa);
if (rv != APR_SUCCESS) {
cerr << "ERROR: apr_socket_connect() commands connection at port " << javaport << endl;
return -6;
}
//register SocketLogger
singleton_pLogger = new SocketLogger(s,pool);
if (singleton_pLogger == NULL) {
cerr << "ERROR: SocketLogger() failed. " << endl;
return -7;
}
uima::ResourceManager::getInstance().registerLogger(singleton_pLogger);
cout << "deployCppService::initialize() Registered Java Logger." << endl;
}
return 0;
} //initialize
static void* APR_THREAD_FUNC handleCommands(apr_thread_t *thd, void *data) {
//if we are here service initialization was successful, send message
//to controller.
string msg = "0";
apr_size_t len = msg.length();
rv = apr_socket_send(cs, msg.c_str(), &len);
len = 1;
apr_socket_send(cs,"\n", &len);
//receive JMX, admin requests from controller
char buf[9];
memset(buf,0,9);
len = 8;
while ( (rv = apr_socket_recv(cs, buf, &len)) != APR_EOF) {
string command = buf;
memset(buf,0,9);
len=8;
//cout << "apr_socket_recv command=" << command << endl;
if (command.compare("GETSTATS")==0) {
//singleton_pLogger->log(LogStream::EnMessage,"deployCppService","getStats","retrieving stats",0);
singleton_pMonitor->writeStatistics(cs);
//singleton_pLogger->log(LogStream::EnMessage,"deployCppService","getStats","sent statistics",0);
} else if (command.compare("RESET")==0) {
singleton_pLogger->log(uima::LogStream::EnMessage,"deployCppService", "RESET",
"reset JMX statistics",0);
singleton_pMonitor->reset();
} else if (command.compare("SHUTDOWN")==0) {
singleton_pMonitor->shutdown();
break;
} else {
if (rv != APR_SUCCESS) {
singleton_pMonitor->shutdown();
break;
} else {
char * c = new char[256];
apr_strerror(rv, c, 255);
stringstream str;
str << c;
str << "deployCppService::handleCommand() aprerror=" << rv << " invalid command=" << command;
cerr << str.str() << endl;
delete c;
}
}
}
apr_thread_exit(thd, APR_SUCCESS);
cout << "deployCppService::handleCommand() calling shutdown. " << endl;
singleton_pMonitor->shutdown();
return NULL;
}
#endif