blob: afdf96c26e9508b51e8e090232d75efa6cf84155 [file] [log] [blame]
/*------------------------------------------------------------------------
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
--------------------------------------------------------------------------
Test driver that reads text files or XCASs or XMIs and calls the annotator
-------------------------------------------------------------------------- */
/* ----------------------------------------------------------------------- */
/* Include dependencies */
/* ----------------------------------------------------------------------- */
#include <stdio.h>
#include <errno.h>
#include <sys/stat.h>
#include "xercesc/framework/LocalFileInputSource.hpp"
#include "xercesc/util/PlatformUtils.hpp"
#include "xercesc/util/XMLString.hpp"
#include "uima/pragmas.hpp"
#include "uima/filename.hpp"
#include "uima/dirwalk.hpp"
#include "uima/api.hpp"
#include "uima/xmiwriter.hpp"
#include "uima/xcasdeserializer.hpp"
#include "uima/xmideserializer.hpp"
#include <apr_portable.h>
#include <apr_thread_proc.h>
using namespace uima;
/* ----------------------------------------------------------------------- */
/* Implementation */
/* ----------------------------------------------------------------------- */
/* Little helper routine to check and report errors.
This routine just does a hard program exit for any failure! */
static void tafCheckError(TyErrorId utErrorId,
const AnalysisEngine & crEngine);
static void tafCheckError(ErrorInfo const &);
void processInputFiles(AnalysisEngine * pEngine);
// input file or directory
std::string in;
// output directory
std::string out;
//AE descriptor filename
const char* cnfg = NULL;
// sofa to use for creating a tcas
bool useSofa, lenient;
const char* sofaName;
//logging level
int loglevel;
//num iterations over input data.
int numruns;
//num annotator instances
int numinstances;
//randomize input and delay between call to process.
bool randomize;
long rdelay;
// input data types
enum dataFormats { textFormat, xcasFormat, xmiFormat };
dataFormats xcasInput;
void process (AnalysisEngine * pEngine, CAS * cas, std::string in, std::string out);
void writeXmi (CAS & outCas, int num, std::string in, std::string outfn);
void tell() {
cerr << "Usage: runAECpp UimaCppDescriptor <-x> InputFileOrDir <OutputDir>" << endl
<< " <-s Sofa> <-l LogLevel>" << endl;
cerr << " UimaCppDescriptor Analysis Engine descriptor for a CPP annotator" << endl;
cerr << " InputFileOrDir Input file or directory of files to process" << endl;
cerr << " OutputDir Existing directory for Xmi outputs (optional)" << endl;
cerr << " Options:" << endl;
cerr << " -x [-xmi] Input(s) must be in XCAS [XMI] format (default is raw text)" << endl;
cerr << " -lenient For -xmi, ignore unknown types & features" << endl;
cerr << " -s Sofa Name of a Sofa to process (input must be an XCAS or XMI)" << endl;
cerr << " -l logLevel Set to 0, 1, or 2 for Message, Warning, or Error" << endl;
cerr << " -n numInstances number of annotator instances each running in a separate thread" << endl;
cerr << " -r numRuns number of iterations over the same input." << endl;
cerr << " -rand randomize the selection on next input to process." << endl;
cerr << " -rdelay Max add random delay between 0 and Max milliseconds between calls to process." << endl;
}
//=================================================================
//
//Annotator instance threads that process input.
//-----------------------------------------------------------------
static void* APR_THREAD_FUNC process(apr_thread_t *thd, void *data) {
cout << endl << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing... starting thread." << endl;
processInputFiles((AnalysisEngine *) data);
apr_thread_exit(thd, APR_SUCCESS);
return NULL;
}
int main(int argc, char * argv[]) {
try {
/* Access the command line arguments to get the name of the input text. */
//if (argc != 3 && argc != 5 && argc != 7 && argc != 9 && argc != 11) {
if (argc < 3) {
tell();
return 1;
}
useSofa = false;
lenient = false;
xcasInput = textFormat;
//std::string sofa;
//std::string pattern("*");
cnfg = NULL;
loglevel = -1;
numinstances = 1;
numruns = 1;
randomize = false;
rdelay = 0;
int index = 0;
while (++index < argc) {
char* arg = argv[index];
if (0 == strcmp(arg, "-x")) {
xcasInput = xcasFormat;
} else if (0 == strcmp(arg, "-xcas")) {
xcasInput = xcasFormat;
} else if (0 == strcmp(arg, "-xmi")) {
xcasInput = xmiFormat;
} else if (0 == strcmp(arg, "-lenient")) {
lenient = true;
} else if (0 == strcmp(arg, "-s")) {
if ( ++index < argc ) {
sofaName = argv[index];
useSofa = true;
}
} else if (0 == strcmp(arg, "-l")) {
if ( ++index < argc ) {
loglevel = atoi(argv[index]);
if (loglevel < LogStream::EnMessage) {
cerr << "LogLevel less than minimum value (Message) = " << LogStream::EnMessage << endl;
return 1;
}
if (loglevel > LogStream::EnError) {
cerr << "LogLevel greater than maximum value (Error) = " << LogStream::EnError << endl;
return 1;
}
}
} else if (0 == strcmp(arg, "-n")) {
if ( ++index < argc ) {
numinstances = atoi(argv[index]);
if (numinstances < 1) {
cerr << "NumInstances less than minimum value 1 "<< endl;
return 1;
}
if (out.length() > 0) {
cerr << "Output directory may not be specified when NumInstances is more than 1." << endl;
return 1;
}
}
} else if (0 == strcmp(arg, "-rand")) {
randomize = true;
} else if (0 == strcmp(arg, "-r")) {
if ( ++index < argc ) {
numruns = atoi(argv[index]);
if (numruns < 1) {
cerr << "Number of runs less than minimum value 1 "<< endl;
return 1;
}
}
} else if (0 == strcmp(arg, "-rdelay")) {
if ( ++index < argc ) {
rdelay = atol(argv[index]);
rdelay = rdelay; //convert to microsec
if (rdelay < 1) {
cerr << "Random delay in millis less than minimum value of 1 "<< endl;
return 1;
}
}
} else { //one of the standard params - whichever we haven't read yet
if (cnfg == NULL) {
cnfg = arg;
} else if (in.length() == 0) {
in.append(arg);
} else if (out.length() == 0) {
out.append(arg);
if (numinstances > 1) {
cerr << "Output directory may not be specified when NumInstances is more than 1." << endl;
return 1;
}
}
}
} //while
if (in.length() == 0 || index > argc) { // Too few args or no arg after -s or -l
tell();
return 1;
}
if (out == in) {
cout << "runAECpp: ERROR: input and output file paths are the same " << endl;
return -1;
}
/* Create/link up to a UIMACPP resource manager instance (singleton) */
(void) ResourceManager::createInstance("runAECpp");
/* check if input file / directory exists */
uima::util::Filename infn(in.c_str());
if (!infn.isExistent() ) {
cout << "runAECpp: ERROR: input file / directory does not exist. " << endl;
return -1;
}
if (loglevel >= 0) {
ResourceManager::getInstance().setLoggingLevel((LogStream::EnEntryType)loglevel);
}
ErrorInfo errorInfo;
if (numinstances == 1) {
AnalysisEngine * pEngine = Framework::createAnalysisEngine(cnfg, errorInfo);
if (errorInfo.getErrorId() != UIMA_ERR_NONE) {
cerr << "runAECpp:" << endl
<< " Error string : "
<< AnalysisEngine::getErrorIdAsCString(errorInfo.getErrorId()) << endl
<< " UIMACPP Error info:" << endl
<< errorInfo << endl;
exit((int)errorInfo.getErrorId());
}
processInputFiles(pEngine);
} else {
apr_status_t rv = 0;
//APR pool
apr_pool_t *pool;
rv = apr_pool_create(&pool, NULL);
if (rv != APR_SUCCESS) {
cerr << "ERROR: apr_pool_create() failed. " << endl;
return -1;
}
/*create as many AnalysisEngine instances as specified
by numinstances. */
vector<AnalysisEngine *> analysisEngines;
for (int i=0; i < numinstances; i++) {
AnalysisEngine * pEngine = Framework::createAnalysisEngine(cnfg, errorInfo);
if (errorInfo.getErrorId() != UIMA_ERR_NONE) {
cerr << "runAECpp:" << endl
<< " Error string : "
<< AnalysisEngine::getErrorIdAsCString(errorInfo.getErrorId()) << endl
<< " UIMACPP Error info:" << endl
<< errorInfo << endl;
exit((int)errorInfo.getErrorId());
}
analysisEngines.push_back(pEngine);
}
cerr << "Initialized AnalysisEngine " << endl;
/* create and start the processing threads */
apr_threadattr_t * thd_attr=0;
rv = apr_threadattr_create(&thd_attr, pool);
assert(rv == APR_SUCCESS);
vector<apr_thread_t *> processingThreads;
for (int i=0; i < numinstances; i++) {
apr_thread_t *thread=0;
rv = apr_thread_create(&thread, thd_attr, process, analysisEngines.at(i), pool);
assert(rv == APR_SUCCESS);
processingThreads.push_back(thread);
apr_sleep(10000); //required so that time function to distinctly seed randomizer.
}
cerr << "Wait for processing threads to finish " << endl;
/* wait for threads to end */
for (size_t i=0; i < processingThreads.size(); i++) {
//cout << "runAECpp: wait for thread " << i << " to end " << endl;
apr_thread_join(&rv, processingThreads.at(i));
}
if (pool) {
apr_pool_destroy(pool);
pool=0;
}
}
} catch (Exception e) {
cout << "runAECpp: " << e << endl;
}
/* If we got this far everything went OK */
cout << "runAECpp: processing finished sucessfully! " << endl;
return(0);
}
/* Little helper routine to check and report errors.
This routine just does a hard program exit for any failure!
*/
static void tafCheckError(TyErrorId utErrorId,
const AnalysisEngine & crEngine) {
if (utErrorId != UIMA_ERR_NONE) {
cerr << "runAECpp:" << endl;
cerr << " Error number : "
<< utErrorId << endl;
cerr << " Error string : "
<< AnalysisEngine::getErrorIdAsCString(utErrorId) << endl;
const TCHAR * errStr = crEngine.getAnnotatorContext().getLogger().getLastErrorAsCStr();
if (errStr != NULL)
cerr << " Last logged message : " << errStr << endl;
exit((int)utErrorId);
}
}
/* Similar routine as above just with error info objects instead of err-ids.
This routine just does a hard program exit for any failure!
*/
static void tafCheckError(ErrorInfo const & errInfo) {
if (errInfo.getErrorId() != UIMA_ERR_NONE) {
cerr << "runAECpp:" << endl
<< " Error string : "
<< AnalysisEngine::getErrorIdAsCString(errInfo.getErrorId()) << endl
<< " UIMACPP Error info:" << endl
<< errInfo << endl;
exit((int)errInfo.getErrorId());
}
}
void process (AnalysisEngine * pEngine, CAS * cas, std::string in, std::string outfn) {
cout << endl << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing " << in << " " << out << endl;
try {
if (xcasInput != textFormat) {
/* initialize from an xcas or xmicas */
//cout << "runAECpp::processing xml file " << in << endl;
XMLCh* native = XMLString::transcode(in.c_str());
LocalFileInputSource fileIS(native);
XMLString::release(&native);
if (xcasInput == xcasFormat) {
XCASDeserializer::deserialize(fileIS, *cas);
}
else {
XmiDeserializer::deserialize(fileIS, *cas, lenient);
}
} else {
/* read as text file and set document text of default view */
FILE * pFile = fopen(in.c_str(),"rb");
int filesize;
if (pFile == NULL) {
cerr << "RunAECpp: Error reading file " << in << endl;
exit(-1);
}
/* allocate buffer for file contents */
struct stat fstat;
stat(in.c_str(), &fstat);
filesize = fstat.st_size;
char * pBuffer = new char[filesize+1];
if (pBuffer == NULL) {
cerr << "RunAECpp: Error allocating buffer to hold contents of file " << in << endl;
exit(-1);
}
/* read the file */
size_t numread = fread(pBuffer,1,filesize,pFile);
fclose(pFile);
/* convert to unicode and set tcas document text*/
UnicodeString ustrInputText(pBuffer, (int32_t)numread, "utf-8");
cas->setDocumentText(UnicodeStringRef(ustrInputText));
delete[] pBuffer;
}
// Is the input a tcas?
if (!useSofa && cas->isBackwardCompatibleCas()) {
useSofa = true;
sofaName = CAS::NAME_DEFAULT_TEXT_SOFA;
}
// Is a specific Sofa view specified?
if (useSofa) {
/* process the specified TCAS */
SofaFS mySofa = cas->getSofa(pEngine->getAnnotatorContext().mapToSofaID(sofaName));
if (!mySofa.isValid()) {
cerr << "runAECpp:" << endl
<< " Specified Sofa named " << sofaName
<< " not found in the input file" << endl;
exit(99);
}
CASIterator casIter = pEngine->processAndOutputNewCASes(*cas->getView(mySofa));
int i=0;
while (casIter.hasNext()) {
i++;
CAS & outCas = casIter.next();
//write out xmi
if (outfn.length() > 0) {
writeXmi(outCas, i, in, outfn);
}
//release the CAS
pEngine->getAnnotatorContext().releaseCAS(outCas);
cout << "runAECpp::processing new Cas " << i << endl;
}
} else {
/* process the CAS */
CASIterator casIter = ((AnalysisEngine*)pEngine)->processAndOutputNewCASes(*cas);
int i=0;
while (casIter.hasNext()) {
i++;
CAS & outCas = casIter.next();
//write out xmi
if (outfn.length() > 0) {
writeXmi(outCas, i, in, outfn);
}
//release CAS
pEngine->getAnnotatorContext().releaseCAS(outCas);
cout << "runAECpp::processing new Cas " << i << endl;
}
}
if (outfn.length() > 0) {
util::Filename infile((TCHAR*) in.c_str());
outfn.append("/");
outfn.append(infile.getName());
//open a file stream for output xmi
ofstream file;
file.open (outfn.c_str(), ios::out | ios::binary);
if ( !file ) {
cerr << "runAECpp: Error opening output xmi: " << outfn.c_str() << endl;
exit(99);
}
//serialize the input cas
cout << "runAECpp::processing write out xmi " << outfn << endl;
XmiWriter writer(*cas, true);
writer.write(file);
file.close();
}
} catch (Exception e) {
ErrorInfo errInfo = e.getErrorInfo();
cerr << "runAECPP::Error " << errInfo.getErrorId() << " " << errInfo.getMessage() << endl;
cerr << errInfo << endl;
}
}
void writeXmi (CAS & outCas, int num, std::string in, std::string outfn) {
util::Filename infile((TCHAR*) in.c_str());
std::string ofn;
ofn.append(outfn.c_str());
ofn.append("/");
ofn.append(infile.getName());
ofn.append("_seg_");
stringstream s;
s << num;
ofn.append(s.str());
//open a file stream for output xmi
ofstream file;
file.open (ofn.c_str(), ios::out | ios::binary);
if ( !file ) {
cerr << "Error opening output xmi: " << ofn.c_str() << endl;
exit(99);
}
//serialize the cas
cout << "runAECpp::processing write out xmi " << ofn << endl;
XmiWriter writer(outCas, true);
writer.write(file);
file.close();
}
void processInputFiles(AnalysisEngine * pEngine) {
TyErrorId utErrorId; // Variable to store UIMACPP return codes
ErrorInfo errorInfo; // Variable to stored detailed error info
int count = 0;
stringstream str;
str << endl << "ThreadId: " << apr_os_thread_current();
str << " runAECpp: Processing started. Number of runs " << numruns
<< " rdelay " << rdelay << " millis. " ;
if (randomize)
str << " Inputs processed in random order. ";
cout << str.str() << endl;
//uima::ResourceManager::getInstance().getLogger().logMessage(str.str() + " started: " );
/* Get a new CAS */
CAS* cas = pEngine->newCAS();
if (cas == NULL) {
cerr << "runAECpp: pEngine->newCAS() failed." << endl;
exit (1);
}
/* initialize random seed: */
srand( time(NULL) + (apr_time_now() % 10000) );
for (int i=0; i < numruns; i++) {
stringstream str;
cout << endl << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing start iteration: " << i << endl;
//uima::ResourceManager::getInstance().getLogger().logMessage(str.str() );
/* process input */
util::DirectoryWalk dirwalker(in.c_str());
if (dirwalker.isValid()) {
cout << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing all files in directory: " << in.c_str() << endl;
util::Filename infile(in.c_str(),"FilenamePlaceHolder");
if (!randomize) {
while (dirwalker.isValid()) {
// Process all files or just the ones with matching suffix
if ( dirwalker.isFile() ) {
infile.setNewName(dirwalker.getNameWithoutPath());
std::string afile(infile.getAsCString());
stringstream str;
if (count % 100 == 0 && count > 0) {
str << apr_time_now() << " ThreadId: " << apr_os_thread_current() << " numProcessed=" << count;
cerr << str.str() << endl;
uima::ResourceManager::getInstance().getLogger().logMessage(str.str() );
}
//process the cas
process(pEngine,cas,afile, out);
//reset the cas
cas->reset();
if (rdelay > 0) {
int howlong = rand() % rdelay;
cout << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing sleep for " << howlong << " millis " << endl;
apr_sleep(howlong*1000);
}
count++;
}
//get the next input file in the directory
dirwalker.setToNext();
}
} else {
//construct a list of the input files.
vector<std::string> filenames;
while (dirwalker.isValid()) {
// Process all files or just the ones with matching suffix
if ( dirwalker.isFile() ) {
infile.setNewName(dirwalker.getNameWithoutPath());
filenames.push_back(infile.getAsCString());
}
//get the next input file in the directory
dirwalker.setToNext();
}
//how many to process in this run.
int num = filenames.size();
for (int i=0; i < num; i++) {
//select next file to be processed.
int index = rand() % filenames.size(); //number between 1 and number of files
stringstream str;
if (count % 100 == 0 && count > 0) {
str << apr_time_now() << " ThreadId: " << apr_os_thread_current() << " runAECpp::processing numProcessed=" << count;
cerr << str.str() << endl;
uima::ResourceManager::getInstance().getLogger().logMessage(str.str() );
}
string afile = filenames.at(index);
//cout << "ThreadId: " << apr_os_thread_current() << "runAECpp::processing file " << index << " " << afile << endl;
//process
process(pEngine, cas, afile, out);
cas->reset();
//sleep for time specified by rdelay
if (rdelay > 0) {
int howlong = rand() % rdelay;
cout << "ThreadId: " << apr_os_thread_current() << " runAECpp::processing sleep for " << howlong << " millis " << endl;
apr_sleep(howlong*1000);
}
count++;
}
}
} else {
//process the cas
process(pEngine,cas, in, out);
}
/* call collectionProcessComplete */
utErrorId = pEngine->collectionProcessComplete();
}
/* Free annotator */
utErrorId = pEngine->destroy();
delete cas;
delete pEngine;
cout << "ThreadId: " << apr_os_thread_current() << " runAECpp finished processing." << endl;
}
/* <EOF> */