blob: 046e53a8c30ed19a53cea50e2192c1d6457f291f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ee.test;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import junit.framework.Assert;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.log4j.Logger;
import org.apache.uima.UIMAFramework;
import org.apache.uima.UIMA_IllegalStateException;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.client.UimaASProcessStatus;
import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.error.UimaASPingTimeout;
import org.apache.uima.aae.error.UimaASProcessCasTimeout;
import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.adapter.jms.message.JmsMessageContext;
import org.apache.uima.analysis_engine.AnalysisEngineDescription;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.TypeSystem;
import org.apache.uima.cas.impl.XmiCasDeserializer;
import org.apache.uima.collection.CollectionException;
import org.apache.uima.collection.CollectionReader;
import org.apache.uima.collection.CollectionReaderDescription;
import org.apache.uima.collection.EntityProcessStatus;
import org.apache.uima.ee.test.utils.BaseTestSupport;
import org.apache.uima.ee.test.utils.UimaASTestRunner;
import org.apache.uima.internal.util.XMLUtils;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.resource.metadata.TypePriorityList;
import org.apache.uima.resourceSpecifier.factory.DeploymentDescriptorFactory;
import org.apache.uima.resourceSpecifier.factory.ServiceContext;
import org.apache.uima.resourceSpecifier.factory.UimaASPrimitiveDeploymentDescriptor;
import org.apache.uima.resourceSpecifier.factory.impl.ServiceContextImpl;
import org.apache.uima.util.XMLInputSource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.xml.sax.SAXException;
import org.xml.sax.SAXParseException;
@RunWith(UimaASTestRunner.class)
public class TestUimaASExtended extends BaseTestSupport {
private CountDownLatch getMetaCountLatch = null;
private static final int MaxGetMetaRetryCount = 2;
private static final String primitiveServiceQueue1 = "NoOpAnnotatorQueue";
private static final String PrimitiveDescriptor1 = "resources/descriptors/analysis_engine/NoOpAnnotator.xml";
private int getMetaRequestCount = 0;
public BaseTestSupport superRef = null;
/**
* bring up testing for compressed binary serialization
*/
/*
public static void main(String[] args ) {
JUnitCore core= new JUnitCore();
core.addListener(new UimaASJunitTestFailFastListener());
core.run(TestUimaASExtended.class);
}
*/
private String getMasterConnectorURI(BrokerService b) {
return b.getDefaultSocketURIString();
}
/*
@Test
public void testBrokerRestartWithPrimitiveMultiplier() throws Exception {
System.out.println("-------------- testBrokerRestartWithPrimitiveMultiplier -------------");
System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
broker.stop();
broker.waitUntilStopped();
broker = createBroker();
broker.start();
broker.waitUntilStarted();
String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString();
Map<String, Object> appCtx =
buildContext(burl, "TestMultiplierQueue");
// reduce the cas pool size and reply window
appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
runTest(appCtx, eeUimaEngine,burl,
"TestMultiplierQueue", 1, PROCESS_LATCH);
eeUimaEngine.stop();
}
*/
/**
* This test starts a secondary broker, starts NoOp Annotator, and
* using synchronous sendAndReceive() sends 5 CASes for analysis. Before sending 6th, the test
* stops the secondary broker and sends 5 more CASes. All CASes sent after
* the broker shutdown result in GetMeta ping and a subsequent timeout. Before
* sending 11th CAS the test starts the broker again and sends 10 more CASes
* @throws Exception
*/
@Test
public void testSyncClientRecoveryFromBrokerStopAndRestart() throws Exception {
System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------");
//System.setProperty("uima.as.enable.jmx","false");
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
BrokerService broker2 = setupSecondaryBroker(true);
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
Map<String, Object> appCtx =
buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
int errorCount=0;
int delay = 1;
for (int i = 0; i < 20; i++) {
if ( i == 5 ) {
broker2.stop();
broker2.waitUntilStopped();
System.out.println("..... Stopped broker ............................");
Timer timer = new Timer();
timer.schedule(new StartBrokerTask(broker2, this),100);
//delay =5000;
}
synchronized(appCtx) {
try {
appCtx.wait(delay);
} catch( InterruptedException eee) {
}
}
/*
else if ( i == 10 ) {
// restart the broker
System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
broker2 = setupSecondaryBroker(true);
broker2.start();
broker2.waitUntilStarted();
}
*/
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
try {
uimaAsEngine.sendAndReceiveCAS(cas);
} catch( Exception e) {
errorCount++;
System.out.println("Client Received Expected Error on CAS:"+(i+1));
} finally {
cas.release();
}
}
uimaAsEngine.stop();
super.cleanBroker(broker2);
broker2.stop();
// expecting 5 failures due to broker missing
// if ( errorCount != 5 ) {
// fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures");
// }
broker2.waitUntilStopped();
}
/**
* This test creates 4 UIMA AS clients and runs each in a separate thread. There is a single
* shared jms connection to a broker that each client uses. After initialization a client
* sends 1000 CASes to a remote service. While clients are processing the test kills
* the broker, waits for 4 seconds and restarts it. While the broker is down, clients
* keep trying sending CASes, receiving Ping timeouts. Once the broker is available again
* all clients should recover and begin processing CASes again. This tests recovery of a
* shared connection.
*
* @throws Exception
*/
@Test
public void testMultipleSyncClientsRecoveryFromBrokerStopAndRestart() throws Exception {
System.out.println("-------------- testMultipleSyncClientsRecoveryFromBrokerStopAndRestart -------------");
//System.setProperty("uima.as.enable.jmx","false");
final BrokerService broker2 = setupSecondaryBroker(true);
final CountDownLatch latch = new CountDownLatch(8);
Thread[] clientThreads = new Thread[8];
// Create 4 Uima AS clients each running in a separate thread
for(int i=0; i < clientThreads.length; i++) {
clientThreads[i] = new Thread() {
public void run() {
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
try {
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
Map<String, Object> appCtx =
buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 1100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
for (int i = 0; i < 500; i++) {
if ( i == 5 ) {
latch.countDown(); // indicate that some CASes were processed
}
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
System.out.println("UIMA AS Client#"+ Thread.currentThread().getId()+" Sending CAS#"+(i + 1) + " Request to a Service");
try {
uimaAsEngine.sendAndReceiveCAS(cas);
} catch( Exception e) {
System.out.println("Client Received Expected Error on CAS:"+(i+1));
} finally {
cas.release();
}
synchronized(uimaAsEngine) {
uimaAsEngine.wait(50);
}
}
System.out.println("Thread:"+Thread.currentThread().getId()+" Completed run()");
uimaAsEngine.stop();
} catch( Exception e) {
e.printStackTrace();
return;
}
}
};
clientThreads[i].start();
}
BrokerService broker3 = null;
try {
latch.await(); // wait for all threads to process a few CASes
broker2.stop();
System.out.println("Stopping Broker - wait ...");
broker2.waitUntilStopped();
System.out.println("Restarting Broker - wait ...");
// restart the broker
broker3 = setupSecondaryBroker(true);
broker3.waitUntilStarted();
} catch ( Exception e ) {
} finally {
for(int i=0; i < clientThreads.length; i++ ) {
System.out.println(".... UIMA-AS Client#"+clientThreads[i].getId()+" Waiting for completion");
clientThreads[i].join();
System.out.println(".... UIMA-AS Client#"+clientThreads[i].getId()+" Completed");
}
System.out.println("Stopping Broker - wait ...");
if ( broker3 != null ) {
super.cleanBroker(broker3);
broker3.stop();
broker3.waitUntilStopped();
}
}
}
@Test
public void testClient() throws Exception {
System.out.println("-------------- testClient -------------");
System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
BaseUIMAAsynchronousEngine_impl uimaAsEngine1 = new BaseUIMAAsynchronousEngine_impl();
BaseUIMAAsynchronousEngine_impl uimaAsEngine2 = new BaseUIMAAsynchronousEngine_impl();
File directory = new File("./");
System.out.println(directory.getAbsolutePath());
String sid1= deployService(uimaAsEngine1, relativePath + "/Deploy_AggregateMultiplierWith30SecDelay.xml");
String sid2 = deployService(uimaAsEngine2, relativePath + "/Deploy_AggregateMultiplierWith30SecDelay.xml");
uimaAsEngine1.undeploy(sid1);
uimaAsEngine2.undeploy(sid2);
}
@Test
public void testClientWithPrimitives() throws Exception {
System.out.println("-------------- testClientWithPrimitives -------------");
System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
BaseUIMAAsynchronousEngine_impl uimaAsEngine1 = new BaseUIMAAsynchronousEngine_impl();
BaseUIMAAsynchronousEngine_impl uimaAsEngine2 = new BaseUIMAAsynchronousEngine_impl();
String sid1= deployService(uimaAsEngine1, relativePath + "/Deploy_NoOpAnnotator.xml");
String sid2 = deployService(uimaAsEngine2, relativePath + "/Deploy_NoOpAnnotator.xml");
uimaAsEngine1.undeploy(sid1);
uimaAsEngine2.undeploy(sid2);
}
/**
* Tests error handling of the client. It deploys Aggregate service Cas Multiplier. initializes
* the client and sends a CAS for processing. The child CAS is than held in NoOp Annotator for
* 30 secs to simulate heavy processing. While the CAS is being processed, a broker is stopped.
* The client should timeout after 40 secs and attempt to send 2 more CASes. Since the broker
* is down, each of these 2 CASes goes into a retry list while a Connection is being retried.
* Both should timeout, and sendAndReceive() should fail due to a timeout.
*
* @throws Exception
*/
/*
@Test
public void testClientRecoveryFromBrokerFailure() throws Exception {
System.out.println("-------------- testClientRecoveryFromBrokerFailure -------------");
System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(uimaAsEngine, relativePath + "/Deploy_AggregateMultiplierWith30SecDelay.xml");
Map<String, Object> appCtx =
buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 40000); // AE will hold the CAS for 30 secs so this needs to be larger
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
// schedule a thread that will stop the broker after 10 secs
s.schedule(
new Runnable() {
@Override
public void run() {
try {
System.out.println("Stopping Broker ...");
broker.stop();
broker.waitUntilStopped();
System.out.println("Broker Stopped...");
} catch( Exception e) {
}
}
}
, 10, TimeUnit.SECONDS);
int timeoutCount=0;
// try to send 3 CASes, each should timeout
for (int i = 0; i < 3; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
try {
System.out.println("............... Client Sending CAS #"+(i+1));
uimaAsEngine.sendAndReceiveCAS(cas);
} catch( Exception e) {
if ( e instanceof UimaASProcessCasTimeout ) {
timeoutCount++;
System.out.println("Client .............. "+e.getMessage());
if ( e.getCause() != null && e instanceof UimaASPingTimeout) {
System.out.println("Client .............. "+e.getCause().getMessage());
}
} else if ( e.getCause() instanceof UimaASProcessCasTimeout ) {
timeoutCount++;
System.out.println("Client .............. "+e.getCause().getMessage());
if ( e.getCause().getCause() != null && e.getCause().getCause() instanceof UimaASPingTimeout) {
System.out.println("Client .............. "+e.getCause().getCause().getMessage());
}
} else {
e.printStackTrace();
}
// System.out.println("Client Received Expected Error on CAS:"+(i+1));
} finally {
cas.release();
}
}
if ( timeoutCount != 3) {
uimaAsEngine.stop();
fail("Expected 3 Errors Due to Timeout, Instead Got "+timeoutCount+" Timeouts");
} else {
System.out.println("Client .............. Stopping");
uimaAsEngine.stop();
}
}
@Test
public void testBrokerRestartWithAggregateMultiplier() throws Exception {
System.out.println("-------------- testBrokerRestartWithAggregateMultiplier -------------");
System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateMultiplier.xml");
String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString();
Map<String, Object> appCtx = buildContext(burl, "TopLevelTaeQueue");
synchronized(this) {
this.wait(2000);
}
broker.stop();
broker.waitUntilStopped();
synchronized(this) {
this.wait(2000);
}
broker = createBroker();
broker.start();
broker.waitUntilStarted();
synchronized(this) {
this.wait(2000);
}
// reduce the cas pool size and reply window
appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
runTest(appCtx, eeUimaEngine,burl,
"TopLevelTaeQueue", 1, PROCESS_LATCH);
// eeUimaEngine.stop();
}
*/
/**
* Tests client and service recovery from broker restart. It deploys CM service, dispatches
* a CAS for processing and while the CAS is in process, it bounces a broker. The service
* listeners should be restored and the CAS should fail due to invalid destination. Once
* the client times out, it should send 2 more CASes which should force client to re-establish
* connection with a broker and replies should come back.
*
* @throws Exception
*/
@Test
public void testBrokerRestartWithAggregateMultiplierWhileProcessingCAS() throws Exception {
System.out.println("-------------- testBrokerRestartWithAggregateMultiplierWhileProcessingCAS -------------");
System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(uimaAsEngine, relativePath + "/Deploy_AggregateMultiplierWith30SecDelay.xml");
Map<String, Object> appCtx =
buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 40000);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
s.schedule(
new Runnable() {
@Override
public void run() {
try {
System.out.println("Stopping Broker ...");
broker.stop();
broker.waitUntilStopped();
System.out.println("Broker Stopped...");
broker = createBroker();
broker.start();
broker.waitUntilStarted();
System.out.println("Broker Restarted...");
} catch( Exception e) {
}
}
}
, 10, TimeUnit.SECONDS);
for (int i = 0; i < 3; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
try {
System.out.println("............... Client Sending CAS #"+(i+1));
uimaAsEngine.sendAndReceiveCAS(cas);
} catch( Exception e) {
e.printStackTrace();
// System.out.println("Client Received Expected Error on CAS:"+(i+1));
} finally {
cas.release();
}
}
s.shutdownNow();
s.awaitTermination(10, TimeUnit.HOURS);
uimaAsEngine.stop();
}
@Test
public void testBrokerRestartWhileProcessingCAS() throws Exception {
System.out.println("-------------- testBrokerRestartWhileProcessingCAS -------------");
System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWith30SecDelay.xml");
Map<String, Object> appCtx =
buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "NoOpAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 40000);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
s.schedule(
new Runnable() {
@Override
public void run() {
try {
System.out.println("Stopping Broker ...");
broker.stop();
broker.waitUntilStopped();
System.out.println("Broker Stopped...");
broker = createBroker();
broker.start();
broker.waitUntilStarted();
System.out.println("Broker Restarted...");
} catch( Exception e) {
}
}
}
, 10, TimeUnit.SECONDS);
for (int i = 0; i < 1; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
try {
uimaAsEngine.sendAndReceiveCAS(cas);
} catch( Exception e) {
e.printStackTrace();
// System.out.println("Client Received Expected Error on CAS:"+(i+1));
} finally {
cas.release();
}
}
s.shutdownNow();
s.awaitTermination(10, TimeUnit.HOURS);
uimaAsEngine.stop();
}
/**
* This tests if broker keep-alive protocol is working. With AMQ 5.13.2 the test
* fails due to broker bug. What happens is that when a jms client uses http
* protocol, the connection is made but the keep-alive chat between broker and
* client is not causing a timeout and an exception.
*
* The exception is internal to the broker but it also happens within amq
* client code. To get to this, a custom spring based listener is deployed
* with some of its exception handling methods overriden to capture an exception.
*
* @throws Exception
*/
@Test
public void testServiceWithHttpListeners() throws Exception {
System.out.println("-------------- testServiceWithHttpListeners -------------");
// Need java monitor object on which to sleep
Object waitObject = new Object();
// Custom spring listener with handleListenerSetupFailure() overriden to
// capture AMQ exception.
TestDefaultMessageListenerContainer c = new TestDefaultMessageListenerContainer();
c.setConnectionFactory(new ActiveMQConnectionFactory("http://localhost:18888"));
c.setDestinationName("TestQ");
c.setConcurrentConsumers(2);
c.setBeanName("testServiceWithHttpListeners() - JUnit Test Listener");
c.setMessageListener(new JmsInputChannel());
//c.initialize();
//c.afterPropertiesSet();
c.start();
if ( c.isRunning() ) {
System.out.println("... Listener Ready");
}
// Keep-alive has a default 30 secs timeout. Sleep for bit longer than that
// If there is an exception due to keep-alive, an exception handler will be
// called on the TestDefaultMessageListenerContainer instance where we
// capture the error.
System.out.println("... Waiting for 40 secs");
try {
synchronized(waitObject) {
waitObject.wait(40000);
}
// had there been broker issues relateds to keep-alive the listener's failed
// flag would have been set by now. Check it and fail the test
if ( c.failed() ) {
fail("Broker Failed - Reason:"+c.getReasonForFailure());
} else {
}
} catch( Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
System.out.println("Stopping Listener");
c.stop();
c.shutdown();
}
/*
public void testContinueOnRetryFailure2() throws Exception {
System.out.println("-------------- testContinueOnRetryFailure -------------");
File tempDir = new File("target/temp");
deleteAllFiles(tempDir);
try {
tempDir.mkdir();
} catch( Exception e) {
e.printStackTrace();
throw e;
}
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_WriterAnnotatorA.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_WriterAnnotatorB.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithContinueOnRetryFailures.xml");
runTest(null, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
if (!(new File(tempDir, "WriterAnnotatorB.3")).exists()
|| (new File(tempDir, "WriterAnnotatorB.4")).exists()) {
fail("Second annotator should have run 3 times");
}
if ((new File(tempDir, "WriterAnnotatorC.1")).exists()) {
fail("Third annotator should not have seen CAS");
}
}
*/
/*
* Tests
*/
// @Test
// public void testSyncClientRecoveryFromBrokerStopAndRestart3() throws Exception {
// System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------");
// System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
//
// // Instantiate Uima AS Client
// BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
// //BrokerService broker2 = setupSecondaryBroker(true);
// // Deploy Uima AS Primitive Service
// deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
//
// Map<String, Object> appCtx =
// buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "NoOpAnnotatorQueue");
// appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
// appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
// appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
// initialize(uimaAsEngine, appCtx);
// waitUntilInitialized();
//
//
// broker.stop();
// broker.waitUntilStopped();
//
// //System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
// //broker2 = setupSecondaryBroker(true);
// broker = createBroker();
// broker.start();
// broker.waitUntilStarted();
// int errorCount = 0;
// System.out.println("Sending CASes");
// for (int i = 0; i < 60; i++) {
// CAS cas = uimaAsEngine.getCAS();
// cas.setDocumentText("Some Text");
// try {
// uimaAsEngine.sendAndReceiveCAS(cas);
// } catch( Exception e) {
// System.out.println("Client Received Expected Error on CAS:"+(i+1));
// } finally {
// cas.release();
// }
//
//
// }
// uimaAsEngine.stop();
//
// /*
// int errorCount=0;
// for (int i = 0; i < 20; i++) {
//
// if ( i == 5 ) {
// broker2.stop();
// broker2.waitUntilStopped();
// } else if ( i == 10 ) {
// // restart the broker
// System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
// broker2 = setupSecondaryBroker(true);
//
// broker2.start();
// broker2.waitUntilStarted();
//
// }
// CAS cas = uimaAsEngine.getCAS();
// cas.setDocumentText("Some Text");
// // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
// try {
// uimaAsEngine.sendAndReceiveCAS(cas);
// } catch( Exception e) {
// errorCount++;
// System.out.println("Client Received Expected Error on CAS:"+(i+1));
// } finally {
// cas.release();
// }
// }
//
// uimaAsEngine.stop();
// super.cleanBroker(broker2);
//
// broker2.stop();
//
// // expecting 5 failures due to broker missing
// if ( errorCount != 5 ) {
// fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures");
// }
// broker2.waitUntilStopped();
//*/
// }
/*
@Test
public void testSyncClientRecoveryFromBrokerStopAndRestart2() throws Exception {
broker.stop();
broker.waitUntilStopped();
System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------");
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
BrokerService broker2 = setupSecondaryBroker(true);
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
Map<String, Object> appCtx =
buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 5100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
// Get meta received, bounce the broker now.
broker2.stop();
broker2.waitUntilStopped();
// ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("");
// ActiveMQConnection c = (ActiveMQConnection)f.createConnection();
// restart the broker
System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
broker2 = setupSecondaryBroker(true);
broker2.start();
broker2.waitUntilStarted();
// new broker is up. Send a few CASes now
for( int i=0; i < 5; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
try {
uimaAsEngine.sendAndReceiveCAS(cas);
} catch( Exception e) {
e.printStackTrace();
// fail("Unexpected exception from sendAndReceive()- test Failed");
} finally {
cas.release();
}
}
uimaAsEngine.stop();
super.cleanBroker(broker2);
broker2.stop();
broker2.waitUntilStopped();
}
*/
/**
* Test use of a JMS Service Adapter. Invoke from a synchronous aggregate to emulate usage from
* RunAE or RunCPE.
*
* @throws Exception
*/
@Test
public void testJmsServiceAdapter() throws Exception {
Logger.getLogger(this.getClass()).info("-------------- testJmsServiceAdapter -------------");
//setUp();
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
try {
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_SyncAggregateWithJmsService.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
} catch( Exception e ) {
throw e;
}
}
/**
* Test use of a JMS Service Adapter. Invoke from a synchronous aggregate to emulate usage from
* RunAE or RunCPE.
*
* @throws Exception
*/
@Test
public void testJmsServiceAdapterInAsyncAggregate() throws Exception {
Logger.getLogger(this.getClass()).info("-------------- testJmsServiceAdapter -------------");
//setUp();
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
try {
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AsyncAggregateWithJmsService.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
} catch( Exception e ) {
throw e;
}
}
/*
* Tests Uima AS client placeholder handling and substitution. The Uima Aggregate instantiates
* UIMA AS client proxy using Jms Client Descriptor that contains a placeholder
* ${defaultBrokerURL} instead of hard coded Broker URL. The client parses the
* placeholder string, retrieves the name (defaultBrokerURL) and uses it to look
* up tha actual broker URL in System properties.
*/
@Test
public void testJmsServiceAdapterWithPlaceholder() throws Exception {
System.out.println("-------------- testJmsServiceAdapterWithPlaceholder -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_SyncAggregateWithJmsServiceUsingPlaceholder.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
10, PROCESS_LATCH);
}
/**
* Tests use of a JMS Service Adapter and an override of the MultipleDeploymentAllowed.
* In this test, the AE descriptor of the remote service is configured with MultipleDeploymentAllowed=false
* Without the override this causes an exception when instantiating Uima aggregate with
* MultipleDeploymentAllowed=true.
*
* @throws Exception
*/
@Test
public void testJmsServiceAdapterWithOverride() throws Exception {
System.out.println("-------------- testJmsServiceAdapterWithOverride -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_SingleInstancePersonTitleAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_SyncAggregateWithJmsServiceAndScaleoutOverride.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
10, PROCESS_LATCH);
}
@Test
public void testJmsServiceAdapterWithException() throws Exception {
System.out.println("-------------- testJmsServiceAdapterWithException -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithException.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_SyncAggregateWithJmsService.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
initialize(eeUimaEngine, appCtx);
waitUntilInitialized();
CAS cas = eeUimaEngine.getCAS();
try {
eeUimaEngine.sendAndReceiveCAS(cas);
} catch (Exception e) {
e.printStackTrace();
} finally {
eeUimaEngine.collectionProcessingComplete();
cas.reset();
}
eeUimaEngine.stop();
}
@Test
public void testJmsServiceAdapterWithProcessTimeout() throws Exception {
System.out.println("-------------- testJmsServiceAdapterWithProcessTimeout -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithLongDelay.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_SyncAggregateWithJmsServiceLongDelay.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
}
@Test
public void testJmsServiceAdapterWithGetmetaTimeout() throws Exception {
System.out.println("-------------- testJmsServiceAdapterWithGetmetaTimeout -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
try {
deployService(eeUimaEngine, relativePath + "/Deploy_SyncAggregateWithJmsService.xml");
} catch (ResourceInitializationException e) {
System.out.println("Received Expected ResourceInitializationException");
return;
}
Assert.fail("Expected ResourceInitializationException Not Thrown. Instead Got Clean Run");
}
/**
* Test binary compressed serialization between client and svc, and between
* service and remote delegate
*
* @throws Exception
*/
@Test
public void testCompressedTypeFiltering() throws Exception {
System.out.println("-------------- testCompressedTypeFiltering -------------");
// Instantiate Uima-AS Client
final BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_RoomNumberAnnotator.xml");
deployService(uimaAsEngine, relativePath + "/Deploy_MeetingDetectorTAE_RemoteRoomNumberBinary.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)), "MeetingDetectorTaeQueue");
// Set an explicit getMeta (Ping)timeout
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 2000);
// Set an explicit process timeout so to test the ping on timeout
appCtx.put(UimaAsynchronousEngine.Timeout, 1000);
appCtx.put(UimaAsynchronousEngine.SERIALIZATION_STRATEGY, "binary");
runTest(null, uimaAsEngine, String.valueOf(getMasterConnectorURI(broker)),
"MeetingDetectorTaeQueue", 3, PROCESS_LATCH);
}
/**
* Tests Broker startup and shutdown
*/
@Test
public void testBrokerLifecycle() {
System.out.println("-------------- testBrokerLifecycle -------------");
System.out.println("UIMA_HOME=" + System.getenv("UIMA_HOME")
+ System.getProperty("file.separator") + "bin" + System.getProperty("file.separator")
+ "dd2spring.xsl");
}
@Test
public void testForHang() throws Exception {
System.out
.println("-------------- testForHang -------------");
System.setProperty("BrokerURL", getMasterConnectorURI(broker));
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_ComplexAggregateMultiplier.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
100, PROCESS_LATCH);
}
/**
* Tests programmatic generation of DD for deployment
*
* @throws Exception
*/
@Test
public void testGenerateAndDeployPrimitiveDD() throws Exception {
System.out.println("-------------- testGenerateAndDeployPrimitiveDD -------------");
File directory = new File (".");
// Set up a context object containing basic service deployment
// information
ServiceContext context = new ServiceContextImpl("PersonTitle",
"PersonTitle Annotator Description",
directory.getCanonicalPath() +
System.getProperty("file.separator")+
resourceDirPath+
System.getProperty("file.separator")+
"descriptors" +
System.getProperty("file.separator")+
"analysis_engine" +
System.getProperty("file.separator")+
"PersonTitleAnnotator.xml",
"PersonTitleAnnotatorQueue",
getMasterConnectorURI(broker));
context.setCasPoolSize(2);
// create DD with default settings
UimaASPrimitiveDeploymentDescriptor dd = DeploymentDescriptorFactory
.createPrimitiveDeploymentDescriptor(context);
// Get default Error Handler for process and change error threshold
dd.getProcessErrorHandlingSettings().setThresholdCount(4);
// Two instances of AE in a jvm
dd.setScaleup(2);
// Generate deployment descriptor in xml format
String ddXML = dd.toXML();
System.out.println(ddXML);
File tempFile = File.createTempFile("Deploy_PersonTitle", ".xml");
BufferedWriter out = new BufferedWriter(new FileWriter(tempFile));
out.write(ddXML);
out.close();
char FS = System.getProperty("file.separator").charAt(0);
// create Map to hold required parameters
Map<String,Object> appCtx = new HashMap<String,Object>();
appCtx.put(UimaAsynchronousEngine.DD2SpringXsltFilePath,
"../src/main/scripts/dd2spring.xsl".replace('/', FS));
appCtx.put(UimaAsynchronousEngine.SaxonClasspath,
"file:../src/main/saxon/saxon8.jar".replace('/', FS));
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
String aSpringContainerId =
eeUimaEngine.deploy(tempFile.getAbsolutePath(), appCtx);
eeUimaEngine.undeploy(aSpringContainerId);
eeUimaEngine.stop();
}
@Test
public void testDeployAggregateServiceWithFailingCollocatedCM() throws Exception {
System.out.println("-------------- testDeployAggregateServiceWithFailingCollocatedCM -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorWithFailingCollocatedCM.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 0);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
//addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class);
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
}
public void getLargeCAS(CAS aCAS, File xmiFile) throws IOException, CollectionException {
FileInputStream inputStream = new FileInputStream(xmiFile);
try {
XmiCasDeserializer.deserialize(inputStream, aCAS, false);
} catch (SAXException e) {
throw new CollectionException(e);
} finally {
inputStream.close();
}
}
/*
@Test
public void testLargeCAS() {
System.out.println("-------------- testLargeCAS -------------");
try {
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima AS Primitive Service
// deployService(uimaAsEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
// Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
// "PersonTitleAnnotatorQueue");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
CAS cas = uimaAsEngine.getCAS();
getLargeCAS(cas, new File("C:/uima/largeCASTest/NYT_ENG_20070514.0065.out.xmi"));
System.out.println("UIMA AS Client Sending CAS Request to a Service");
uimaAsEngine.sendCAS(cas);
uimaAsEngine.collectionProcessingComplete();
System.clearProperty("DefaultBrokerURL");
uimaAsEngine.stop();
} catch( Exception e) {
e.printStackTrace();
}
}
*/
/**
* Tests service quiesce and stop support. This test sets a CasPool to 1 to send just one CAS at a
* time. After the first CAS is sent, a thread is started with a timer to expire before the reply
* is received. When the timer expires, the client initiates quiesceAndStop on the top level
* controller. As part of this, the top level controller stops its listeners on the input queue
* (GetMeta and Process Listeners), and registers a callback with the InProcess cache. When the
* cache is empty, meaning all CASes are processed, the cache notifies the controller which then
* begins the service shutdown. Meanwhile, the client receives a reply for the first CAS, and
* sends a second CAS. This CAS, will remain in the queue as the service has previously stopped
* its listeners. The client times out after 10 seconds and shuts down.
*
* @throws Exception
*/
@Test
public void testQuiesceAndStop() throws Exception {
System.out.println("-------------- testQuiesceAndStop -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
// Set an explicit process timeout so to test the ping on timeout
appCtx.put(UimaAsynchronousEngine.Timeout, 10000);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300);
appCtx.put(UimaAsynchronousEngine.CasPoolSize, 10);
String containers[] = new String[1];
containers[0] = deployService(eeUimaEngine, relativePath + "/Deploy_ScaledPrimitiveAggregateAnnotator.xml");
// String containers[] = new String[2];
// containers[0] = deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
// containers[1] = deployService(eeUimaEngine, relativePath
// + "/Deploy_AggregateAnnotatorWithInternalCM1000Docs.xml");
// spinShutdownThread(eeUimaEngine, 5000, containers, SpringContainerDeployer.QUIESCE_AND_STOP);
spinShutdownThread(eeUimaEngine, 5000, containers, SpringContainerDeployer.QUIESCE_AND_STOP);
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue", 5000, EXCEPTION_LATCH);
//eeUimaEngine.stop();
}
/*
@Test
public void testQuiesceAndStop2() throws Exception {
System.out.println("-------------- testQuiesceAndStop -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
// Set an explicit process timeout so to test the ping on timeout
appCtx.put(UimaAsynchronousEngine.Timeout, 10000);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300);
appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
String containers[] = new String[1];
containers[0] = deployService(eeUimaEngine, relativePath + "/Deploy_ScaledPrimitiveAggregateAnnotator.xml");
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue", 100, PROCESS_LATCH);
System.out.println("------------ Undeploying ----------------");
eeUimaEngine.undeploy(containers[0] , SpringContainerDeployer.QUIESCE_AND_STOP);
// eeUimaEngine.stop();
}
*/
@Test
public void testStopNow() throws Exception {
System.out.println("-------------- testAggregateWithFailedRemoteDelegate -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
String containers[] = new String[2];
containers[0] = deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
containers[1] = deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateAnnotatorWithInternalCM1000Docs.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
// Set an explicit process timeout so to test the ping on timeout
appCtx.put(UimaAsynchronousEngine.Timeout, 4000);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300);
spinShutdownThread(eeUimaEngine, 3000, containers, SpringContainerDeployer.STOP_NOW);
// send may fail since we forcefully stop the service. Tolerate
// ResourceProcessException
addExceptionToignore(ResourceProcessException.class);
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue", 10, EXCEPTION_LATCH);
}
@Test
public void testSendAndReceive() throws Exception {
BaseUIMAAsynchronousEngine_impl uimaAsEngine
= new BaseUIMAAsynchronousEngine_impl();
/*
String id = deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(uimaAsEngine, relativePath + "/Deploy_AggregateAnnotator.xml");
Map<String, Object> appCtx = buildContext(getMasterConnectorURI(broker),"MeetingDetectorTaeQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
//appCtx.put(UimaAsynchronousEngine.TargetSelectorProperty, "MeetingDetector2");
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
for (int i = 0; i < 15; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
System.out.println(".... Sending CAS "+i);
uimaAsEngine.sendAndReceiveCAS(cas);
cas.release();
}
*/
String id = deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(uimaAsEngine, relativePath + "/Deploy_AggregateAnnotator.xml");
// Deploy Uima AS Primitive Service
// deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
Map<String, Object> appCtx = buildContext(getMasterConnectorURI(broker).toString(),"TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
int errorCount = 0;
List<AnalysisEnginePerformanceMetrics> componentMetricsList =
new ArrayList<AnalysisEnginePerformanceMetrics>();
for (int i = 0; i < 15; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
// System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
try {
uimaAsEngine.sendAndReceiveCAS(cas,componentMetricsList);
System.out.println("-------> Client Received Performance Metrics of Size:"+componentMetricsList.size());
for( AnalysisEnginePerformanceMetrics m :componentMetricsList ) {
System.out.println(".............. Component:"+m.getName()+" AnalysisTime:"+m.getAnalysisTime());
}
//uimaAsEngine.sendCAS(cas);
System.out.println("----------------------------------------------------");
componentMetricsList.clear();
} catch( Exception e) {
errorCount++;
} finally {
cas.release();
componentMetricsList.clear();
}
}
uimaAsEngine.stop();
}
@Test
public void testMultipleSyncClientsWithMultipleBrokers() throws Exception {
System.out.println("-------------- testMultipleSyncClientsWithMultipleBrokers -------------");
class RunnableClient implements Runnable {
String brokerURL;
BaseTestSupport testSupport;
BaseUIMAAsynchronousEngine_impl uimaAsEngine;
RunnableClient(String brokerURL,BaseTestSupport testSupport) {
this.brokerURL = brokerURL;
this.testSupport = testSupport;
}
public void initialize(String dd, String serviceEndpoint) throws Exception {
uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, dd);
@SuppressWarnings("unchecked")
Map<String, Object> appCtx = buildContext(brokerURL, serviceEndpoint);
appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
testSupport.initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
}
public void run() {
try {
for (int i = 0; i < 1000; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
// System.out.println("UIMA AS Client#"+ Thread.currentThread().getId()+" Sending CAS#"+(i + 1) + " Request to a Service Managed by Broker:"+brokerURL);
try {
uimaAsEngine.sendAndReceiveCAS(cas);
} catch( Exception e) {
e.printStackTrace();
} finally {
cas.release();
}
}
System.out.println("Thread:"+Thread.currentThread().getId()+" Completed run()");
uimaAsEngine.stop();
} catch( Exception e) {
e.printStackTrace();
}
}
}
ExecutorService executor = Executors.newCachedThreadPool();
// change broker URl in system properties
System.setProperty("BrokerURL", getMasterConnectorURI(broker).toString());
RunnableClient client1 =
new RunnableClient(getMasterConnectorURI(broker), this);
client1.initialize(relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml", "NoOpAnnotatorQueue");
final BrokerService broker2 = setupSecondaryBroker(true);
// change broker URl in system properties
System.setProperty("BrokerURL", broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
RunnableClient client2 =
new RunnableClient(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), this);
client2.initialize(relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml", "NoOpAnnotatorQueue");
Future<?> f1 = executor.submit(client1);
Future<?> f2 = executor.submit(client2);
f1.get();
f2.get();
executor.shutdownNow();
while( !executor.isShutdown() ) {
synchronized(broker) {
broker.wait(500);
}
}
try {
broker2.stop();
// broker.stop();
// broker.waitUntilStopped();
} catch( Exception e) {
// ignore this one. Always thrown by AMQ on stop().
}
}
@Test
public void testAggregateHttpTunnelling() throws Exception {
System.out.println("-------------- testAggregateHttpTunnelling -------------");
// Create Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy remote service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
// Deploy top level aggregate that communicates with the remote via Http Tunnelling
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorWithHttpDelegate.xml");
// Initialize and run the Test. Wait for a completion and cleanup resources.
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
10, CPC_LATCH);
}
@Test
public void testClientHttpTunnellingToAggregate() throws Exception {
System.out.println("-------------- testClientHttpTunnellingToAggregate -------------");
// Add HTTP Connector to the broker.
String httpURI = getHttpURI();
// Create Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy remote service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml");
// Initialize and run the Test. Wait for a completion and cleanup resources.
System.out.println("-------- Connecting Client To Service: "+httpURI);
runTest(null, eeUimaEngine, httpURI, "TopLevelTaeQueue", 1, CPC_LATCH);
}
@Test
public void testClientHttpTunnelling() throws Exception {
System.out.println("-------------- testClientHttpTunnelling -------------");
String httpURI = getHttpURI();
// Create Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy remote service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
// Initialize and run the Test. Wait for a completion and cleanup resources.
System.out.println("-------- Connecting Client To Service: "+httpURI);
runTest(null, eeUimaEngine, httpURI, "NoOpAnnotatorQueue", 1, PROCESS_LATCH);
}
@Test
public void testClientHttpTunnellingWithDoubleByteText() throws Exception {
System.out.println("-------------- testClientHttpTunnellingWithDoubleByteText -------------");
BufferedReader in = null;
try {
File file = new File(relativeDataPath + "/DoubleByteText.txt");
System.out.println("Checking for existence of File:" + file.getAbsolutePath());
// Process only if the file exists
if (file.exists()) {
System.out
.println(" *** DoubleByteText.txt exists and will be sent through http connector.");
System.out.println(" *** If the vanilla activemq release is being used,");
System.out
.println(" *** and DoubleByteText.txt is bigger than 64KB or so, this test case will hang.");
System.out
.println(" *** To fix, override the classpath with the jar files in and under the");
System.out
.println(" *** apache-uima-as/uima-as/src/main/apache-activemq-X.y.z directory");
System.out.println(" *** in the apache-uima-as source distribution.");
String httpURI = getHttpURI();
// Create Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy remote service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
InputStream fis = new FileInputStream(file);
Reader rd = new InputStreamReader(fis, "UTF-8");
in = new BufferedReader(rd);
// Set the double-byte text. This is what will be sent to the service
String line = in.readLine();
super.setDoubleByteText(line);
int err = XMLUtils.checkForNonXmlCharacters(line);
if (err >= 0) {
fail("Illegal XML char at offset " + err);
}
System.out.println("-------- Connecting Client To Service: "+httpURI);
// Initialize and run the Test. Wait for a completion and cleanup resources.
runTest(null, eeUimaEngine, httpURI, "NoOpAnnotatorQueue", 1, CPC_LATCH);
}
} catch (Exception e) {
// Double-Byte Text file not present. Continue on with the next test
e.printStackTrace();
fail("Could not complete test");
} finally {
if (in != null) {
in.close();
}
}
}
/**
* Tests support for ActiveMQ failover protocol expressed in broker
* URL as follows "failover:(tcp:IP:Port1,tcp:IP:Port2)". The test launches a secondary
* broker, launches a Primitive service that uses that broker,
* and finally configures the UIMA AS Client to connect to the secondary broker
* and specifies an alternative broker on a different port. This test
* only tests ability of UIMA AS to handle a complex URL, and it does *not*
* test the actual failover from one broker to the next.
*
* @throws Exception
*/
@Test
public void testBrokerFailoverSupportUsingHTTP() throws Exception {
System.out.println("-------------- testBrokerFailoverSupportUsingTCP -------------");
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
BrokerService broker2 = setupSecondaryBroker(false);
String bhttpURL = addHttpConnector(broker2, DEFAULT_HTTP_PORT2);
String burl = "failover:("+bhttpURL+","+broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString().replaceAll("tcp","http")+")?randomize=false";
System.setProperty("BrokerURL", burl);
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
Map<String, Object> appCtx = buildContext(burl,"NoOpAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
int errorCount = 0;
for (int i = 0; i < 15; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
// System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
try {
uimaAsEngine.sendAndReceiveCAS(cas);
} catch( Exception e) {
errorCount++;
} finally {
cas.release();
}
}
uimaAsEngine.stop();
super.cleanBroker(broker2);
broker2.stop();
broker2.waitUntilStopped();
}
/**
* Tests support for ActiveMQ failover protocol expressed in broker
* URL as follows "failover:(tcp:IP:Port1,tcp:IP:Port2)". The test launches a secondary
* broker, launches a Primitive service that uses that broker,
* and finally configures the UIMA AS Client to connect to the secondary broker
* and specifies an alternate broker on a different port. This test
* only tests ability of UIMA AS to handle a complex URL, and it does *not*
* test the actual failover from one broker to the next.
*
* @throws Exception
*/
@Test
public void testBrokerFailoverSupportUsingTCP() throws Exception {
System.out.println("-------------- testBrokerFailoverSupportUsingTCP -------------");
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
BrokerService broker2 = setupSecondaryBroker(true);
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
Map<String, Object> appCtx = buildContext("failover:("+System.getProperty("BrokerURL")+","+getBrokerUri()+")?randomize=false","NoOpAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
int errorCount = 0;
for (int i = 0; i < 15; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
// System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
try {
uimaAsEngine.sendAndReceiveCAS(cas);
} catch( Exception e) {
errorCount++;
} finally {
cas.release();
}
}
uimaAsEngine.stop();
super.cleanBroker(broker2);
broker2.stop();
broker2.waitUntilStopped();
}
/**
* This test starts a secondary broker, starts NoOp Annotator, and
* using synchronous sendAndReceive() sends 10 CASes for analysis. Before sending 11th, the test
* stops the secondary broker and sends 5 more CASes. All CASes sent after
* the broker shutdown result in GetMeta ping and a subsequent timeout.
* @throws Exception
*/
@Test
public void testSyncClientRecoveryFromBrokerStop() throws Exception {
System.out.println("-------------- testSyncClientRecoveryFromBrokerStop -------------");
//System.setProperty("uima.as.enable.jmx","false");
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
BrokerService broker2 = setupSecondaryBroker(true);
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
Map<String, Object> appCtx =
buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 1100);
appCtx.put(UimaAsynchronousEngine.Timeout, 0);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 300);
appCtx.put(UimaAsynchronousEngine.CasPoolSize,15);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
int errorCount = 0;
for (int i = 0; i < 15; i++) {
if ( i == 10 ) {
// Stop the broker
broker2.stop();
broker2.waitUntilStopped();
Timer timer = new Timer();
timer.schedule(new StartBrokerTask(broker2, this),10000);
}
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
// System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
try {
uimaAsEngine.sendAndReceiveCAS(cas);
} catch( Exception e) {
errorCount++;
System.out.println("Client Received Expected Error on CAS:"+(i+1)+" ErrorCount:"+errorCount);
} finally {
cas.release();
}
}
uimaAsEngine.stop();
// expecting 5 failures due to broker missing
// if ( errorCount != 5 ) {
// fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures");
// }
broker2.stop();
broker2.waitUntilStopped();
}
/*
@Test
public void testAsyncClientRecoveryFromBrokerStopAndRestart() throws Exception {
System.out.println("-------------- testAsyncClientRecoveryFromBrokerStopAndRestart -------------");
BrokerService broker2 = setupSecondaryBroker(true);
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima AS Primitive Service
String id = deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
String brokerUri = broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString();
Map<String, Object> appCtx =
buildContext(brokerUri, "NoOpAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 200);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 200);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
int delay = 1;
for (int i = 0; i < 50; i++) {
if ( i == 10 ) {
broker2.stop();
broker2.waitUntilStopped();
System.out.println("..... Stopped broker ............................");
Timer timer = new Timer();
timer.schedule(new StartBrokerTask(broker2, this),2000);
delay = 5000;
}
synchronized(appCtx) {
try {
appCtx.wait(delay);
} catch( InterruptedException eee) {
}
}
// } else if ( i == 15 ) {
// broker2 = setupSecondaryBroker(true);
// broker2.waitUntilStarted();
// System.out.println("..... Restarted broker ............................");
//
// }
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
uimaAsEngine.sendCAS(cas);
}
uimaAsEngine.stop();
super.cleanBroker(broker2);
broker2.stop();
broker2.waitUntilStopped();
}
*/
/**
* Tests recovery from a broker restart when running multiple instances of
* UIMA AS client in the same JVM. The scenario is:
* 1) start broker
* 2) start service
* 3) create 1st client and initialize it
* 4) create 2nd client and initialize it
* 5) send 1 CAS from client#1
* 6) when reply is received kill broker and restart it
* 7) send 2 CAS from client#1
* 8) CAS #2 fails, but forces SharedConnection to reconnect to broker
* 9) CAS#3 and #4 are sent from client#1
* 10) CASes 1-4 are sent from client#2 without error
*
* @throws Exception
*/
/*
@Test
public void testMultipleClientsRecoveryFromBrokerStopAndRestart() throws Exception {
System.out.println("-------------- testMultipleClientsRecoveryFromBrokerStopAndRestart -------------");
BrokerService broker2 = setupSecondaryBroker(true);
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaClient1 = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima AS Primitive Service
deployService(uimaClient1, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
Map<String, Object> appCtx =
buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 400);
initialize(uimaClient1, appCtx);
waitUntilInitialized();
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaClient2 = new BaseUIMAAsynchronousEngine_impl();
Map<String, Object> appCtx2 =
buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
appCtx2.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx2.put(UimaAsynchronousEngine.CpcTimeout, 1100);
appCtx2.put(UimaAsynchronousEngine.GetMetaTimeout, 400);
initialize(uimaClient2, appCtx2);
waitUntilInitialized();
int errorCount=0;
for (int i = 0; i < 4; i++) {
// Stop broker before second CAS is sent to the service
if ( i == 1 ) {
broker2.stop();
broker2.waitUntilStopped();
// restart broker before 3rd CAS is sent
// restart the broker
broker2 = setupSecondaryBroker(true);
broker2.waitUntilStarted();
}
CAS cas = uimaClient1.getCAS();
cas.setDocumentText("Some Text");
System.out.println("UIMA AS Client#1 Sending CAS#" + (i + 1) + " Request to a Service");
try {
uimaClient1.sendAndReceiveCAS(cas);
System.out.println("UIMA AS Client#1 Received Reply For CAS:"+(i+1));
} catch( Exception e) {
errorCount++;
System.out.println("UIMA AS Client#1 Received Expected Error on CAS:"+(i+1));
} finally {
cas.release();
}
}
System.out.println("Done with UIMA AS Client#1");
for (int i = 0; i < 4; i++) {
CAS cas = uimaClient2.getCAS();
cas.setDocumentText("Some Text");
System.out.println("UIMA AS Client#2 Sending CAS#" + (i + 1) + " Request to a Service");
try {
uimaClient2.sendAndReceiveCAS(cas);
} catch( Exception e) {
errorCount++;
System.out.println("UIMA AS Client#2 Received Expected Error on CAS:"+(i+1));
} finally {
cas.release();
}
}
uimaClient1.stop();
uimaClient2.stop();
super.cleanBroker(broker2);
broker2.stop();
broker2.waitUntilStopped();
}
*/
/**
* Tests ability of an aggregate to recover from a Broker restart. The broker managing
* delegate's input queue is stopped after 1st CAS is fully processed. As part of error
* handling the listener on delegate temp reply queue is stopped and a delegate marked
* as FAILED. The aggregate error handling is configured to retry the command and as
* part of retry a new temp queue and a listener are created for the delegate when
* the broker is restarted. When a 2nd CAS is sent from a client to the aggregate the
* aggregate will force retry of the previous CAS. Once this is done, 2nd CAS is sent
* to the delegate and processing continues.
* @throws Exception
*/
@Test
public void testAggregateRecoveryFromBrokerStopAndRestart() throws Exception {
System.out.println("-------------- testAggregateRecoveryFromBrokerStopAndRestart -------------");
runAggregateRecoveryFromBrokerStopAndRestart("Deploy_AggregateWithRemoteNoOpOnBroker8200.xml");
}
/**
* This test starts a secondary broker, starts NoOp Annotator, and
* using asynchronous send() sends a total of 15 CASes for analysis. After processing 11th
* the test stops the secondary broker and sends 4 more CASes which fails due to broker not running.
*
* @throws Exception
*/
@Test
public void testAsyncClientRecoveryFromBrokerStop() throws Exception {
System.out.println("-------------- testAsyncClientRecoveryFromBrokerStop -------------");
BrokerService broker2 = setupSecondaryBroker(true);
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
Map<String, Object> appCtx =
buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
appCtx.put(UimaAsynchronousEngine.CasPoolSize, 15);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
for (int i = 0; i < 15; i++) {
if ( i == 10 ) {
broker2.stop();
broker2.waitUntilStopped();
}
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
// System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
uimaAsEngine.sendCAS(cas);
}
uimaAsEngine.stop();
super.cleanBroker(broker2);
broker2.stop();
broker2.waitUntilStopped();
}
/**
* Tests ability of an aggregate to recover from a Broker restart. The broker managing
* delegate's input queue is stopped after 1st CAS is fully processed. As part of error
* handling the listener on delegate temp reply queue is stopped and a delegate marked
* as FAILED. The aggregate error handling is configured with no retries. After the broker
* is stopped, the listener on a temp reply queue is shutdown. When the broker is restarted
* the client sends a new CAS which forces creation of a new temp queue and new listener.
* @throws Exception
*/
@Test
public void testAggregateRecoveryFromBrokerStopAndRestartNoDelegateRetries() throws Exception {
System.out.println("-------------- testAggregateRecoveryFromBrokerStopAndRestartNoDelegateRetries -------------");
runAggregateRecoveryFromBrokerStopAndRestart("Deploy_AggregateWithRemoteNoOpOnBroker8200NoRetry.xml");
}
private void runAggregateRecoveryFromBrokerStopAndRestart(String aggregateDescriptor ) throws Exception {
BrokerService broker2 = setupSecondaryBroker(false);
System.setProperty("BrokerURL", broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaClient1 = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima AS Primitive Service
deployService(uimaClient1, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
deployService(uimaClient1, relativePath + "/"+aggregateDescriptor);
Map<String, Object> appCtx =
buildContext(getMasterConnectorURI(broker), "TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
initialize(uimaClient1, appCtx);
waitUntilInitialized();
int errorCount=0;
for (int i = 0; i < 10; i++) {
// Stop broker before second CAS is sent to the service
if ( i == 1 ) {
System.out.println("Stopping Secondary Broker Running on Port:"+broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
broker2.stop();
broker2.waitUntilStopped();
// restart broker before 3rd CAS is sent
// restart the broker
broker2 = setupSecondaryBroker(true);
broker2.waitUntilStarted();
}
CAS cas = uimaClient1.getCAS();
cas.setDocumentText("Some Text");
// System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
try {
uimaClient1.sendAndReceiveCAS(cas);
// System.out.println("UIMA AS Client Received Reply For CAS#" + (i + 1) );
} catch( Exception e) {
errorCount++;
System.out.println("UIMA AS Client Received Expected Error on CAS:"+(i+1));
} finally {
cas.release();
}
}
uimaClient1.stop();
System.out.println("Stopping Broker - wait ...");
super.cleanBroker(broker2);
broker2.stop();
broker2.waitUntilStopped();
}
/**
* Tests sending CPC after CAS timeout. The service is a Primitive taking
* 6 seconds to process a CAS. The client waits for 5 secs to force
* a timeout. This test forces the client to send GetMeta Ping and to
* 'hold' the subsequent CPC request.
*
* @throws Exception
*/
@Test
public void testCpcAfterCasTimeout() throws Exception {
System.out.println("-------------- testCpcAfterCasTimeout -------------");
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorAWithLongDelay.xml");
Map<String, Object> appCtx = buildContext(getMasterConnectorURI(broker),
"NoOpAnnotatorAQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 5000);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
for( int i=0; i < 3; i++ ) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
uimaAsEngine.sendCAS(cas); // will timeout after 5 secs
uimaAsEngine.collectionProcessingComplete(); // the CPC should not
// be sent to a service until the timeout occurs.
}
uimaAsEngine.stop();
}
@Test
public void testClientCRProcess() throws Exception {
System.out.println("-------------- testClientCRProcess -------------");
super.resetCASesProcessed();
// Instantiate Uima AS Client
final BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
// UIMAFramework.getLogger(BaseUIMAAsynchronousEngineCommon_impl.class).setLevel(Level.FINEST);
// UIMAFramework.getLogger(BaseUIMAAsynchronousEngine_impl.class).setLevel(Level.FINEST);
// UIMAFramework.getLogger().setLevel(Level.FINEST);
// UIMAFramework.getLogger().setOutputStream(System.out);
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithLongDelay.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueueLongDelay");
appCtx.put(UimaAsynchronousEngine.Timeout, 0);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
appCtx.put(UimaAsynchronousEngine.CasPoolSize,10);
String collectionReaderDescriptor =
resourceDirPath + System.getProperty("file.separator") +
"descriptors"+ System.getProperty("file.separator") +
"collection_reader"+ System.getProperty("file.separator") +
"FileSystemCollectionReader.xml";
// add Collection Reader if specified
try {
CollectionReaderDescription collectionReaderDescription =
UIMAFramework.getXMLParser()
.parseCollectionReaderDescription(new XMLInputSource(collectionReaderDescriptor));
collectionReaderDescription.getCollectionReaderMetaData().
getConfigurationParameterSettings().
setParameterValue("InputDirectory", relativeDataPath);
CollectionReader collectionReader = UIMAFramework
.produceCollectionReader(collectionReaderDescription);
uimaAsEngine.setCollectionReader(collectionReader);
} catch( Throwable e) {
e.printStackTrace();
}
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
uimaAsEngine.process();
Assert.assertEquals(8, getNumberOfCASesProcessed());
System.clearProperty("DefaultBrokerURL");
uimaAsEngine.stop();
}
private class Killer {
Timer timer;
public Killer(int secs) {
timer = new Timer();
timer.schedule(new KillerTask(), secs*1000);
}
class KillerTask extends TimerTask {
public void run() {
System.out.println("----------------- KillerTask calling System.exit()");
timer.cancel();
System.exit(0);
}
}
}
@Test
public void testClientProcess() throws Exception {
System.out.println("-------------- testClientProcess -------------");
// Instantiate Uima AS Client
final BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"PersonTitleAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 0);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
appCtx.put(UimaAsynchronousEngine.CasPoolSize,2);
appCtx.put(UimaAsynchronousEngine.SERIALIZATION_STRATEGY, "xmi");
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
for (int i = 0; i < 50; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
// System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
uimaAsEngine.sendCAS(cas);
}
uimaAsEngine.collectionProcessingComplete();
System.clearProperty("DefaultBrokerURL");
uimaAsEngine.stop();
}
@Test
public void testClientProcessTimeout() throws Exception {
System.out
.println("-------------- testClientProcessTimeout -------------");
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithLongDelay.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueueLongDelay");
appCtx.put(UimaAsynchronousEngine.Timeout, 300);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
Object o = new Object();
for (int i = 0; i < 6; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
// System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
uimaAsEngine.sendCAS(cas);
synchronized(o) {
o.wait(1000);
}
}
uimaAsEngine.collectionProcessingComplete();
uimaAsEngine.stop();
}
@Test
public void testClientBrokerPlaceholderSubstitution() throws Exception {
System.out.println("-------------- testClientBrokerPlaceholderSubstitution -------------");
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
System.setProperty( "defaultBrokerURL", getMasterConnectorURI(broker));
Map<String, Object> appCtx = buildContext("${defaultBrokerURL}","PersonTitleAnnotatorQueue");
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
for (int i = 0; i < 10; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
// System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
uimaAsEngine.sendCAS(cas);
}
uimaAsEngine.collectionProcessingComplete();
uimaAsEngine.stop();
}
@Test
public void testClientEndpointPlaceholderSubstitution() throws Exception {
System.out.println("-------------- testClientEndpointPlaceholderSubstitution -------------");
// Instantiate Uima AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
// Nest the placeholders in the broker & endpoint strings
String url = getMasterConnectorURI(broker);
System.setProperty( "defaultBrokerURL", url.substring(2,url.length()-2));
String brokerUrl = url.substring(0,2) + "${defaultBrokerURL}" + url.substring(url.length()-2);
System.setProperty( "PersonTitleEndpoint", "TitleAnnotator");
String endpoint = "Person${PersonTitleEndpoint}Queue"; // "PersonTitleAnnotatorQueue"
Map<String, Object> appCtx = buildContext(brokerUrl, endpoint);
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
for (int i = 0; i < 10; i++) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
// System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service");
uimaAsEngine.sendCAS(cas);
}
uimaAsEngine.collectionProcessingComplete();
uimaAsEngine.stop();
}
@Test
public void testClientCpcTimeout() throws Exception {
System.out.println("-------------- testClientCpcTimeout -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
addExceptionToignore(org.apache.uima.aae.error.UimaASCollectionProcessCompleteTimeout.class);
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithCpCDelay.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueue");
// Set an explicit CPC timeout as exceptions thrown in the 2nd annotator's CPC don't reach the
// client.
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 2000);
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueue", 1, CPC_LATCH); // PC_LATCH);
}
/**
* Tests handling of multiple calls to initialize(). A subsequent call to initialize should result
* in ResourceInitializationException.
*
* @throws Exception
*/
@Test
public void testInvalidInitializeCall() throws Exception {
System.out.println("-------------- testInvalidInitializeCall -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"PersonTitleAnnotatorQueue");
try {
initialize(eeUimaEngine, appCtx);
waitUntilInitialized();
System.out.println("First Initialize Call Completed");
eeUimaEngine.initialize(appCtx);
fail("Subsequent call to initialize() did not return expected exception:"
+ UIMA_IllegalStateException.class
+ " Subsequent call to initialize succeeded with no error");
} catch (ResourceInitializationException e) {
if (e.getCause() != null && !(e.getCause() instanceof UIMA_IllegalStateException)) {
fail("Invalid Exception Thrown. Expected:" + UIMA_IllegalStateException.class
+ " Received:" + e.getClass());
} else {
System.out.println("Received Expected Exception:" + UIMA_IllegalStateException.class);
}
} catch (ServiceShutdownException e) {
// expected
} finally {
eeUimaEngine.stop();
}
}
/**
* Tests deployment of a primitive Uima-AS Service (PersontTitleAnnotator). Deploys the primitive
* in the same jvm using Uima-AS Client API and blocks on a monitor until the Uima Client calls
* initializationComplete() method. Once the primitive service starts it is expected to send its
* metadata to the Uima client which in turn notifies this object with a call to
* initializationComplete() where the monitor is signaled to unblock the thread. This code will
* block if the Uima Client does not call initializationComplete()
*
* @throws Exception
*/
@Test
public void testDeployPrimitiveService() throws Exception {
System.out.println("-------------- testDeployPrimitiveService -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
deployService(eeUimaEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"PersonTitleAnnotatorQueue", 0, EXCEPTION_LATCH);
}
@Test
public void testDeployPrimitiveServiceWithTargeting() throws Exception {
System.out.println("-------------- testDeployPrimitiveService -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
System.setProperty(UimaAsynchronousEngine.TargetSelectorProperty,"ServiceOne");
// Deploy Uima-AS Primitive Service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithTargetingSupport.xml");
System.setProperty(UimaAsynchronousEngine.TargetSelectorProperty,"ServiceTwo");
// Deploy Uima-AS Primitive Service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithTargetingSupport.xml");
System.setProperty(UimaAsynchronousEngine.TargetSelectorProperty,"ServiceThree");
// Deploy Uima-AS Primitive Service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithTargetingSupport.xml");
System.getProperties().remove(UimaAsynchronousEngine.TargetSelectorProperty);
Map<String, Object> appCtx = buildContext(getMasterConnectorURI(broker),
"NoOpAnnotatorQueue");
int toProcess = 10;
try {
initialize(eeUimaEngine, appCtx);
waitUntilInitialized();
String tsId = "ServiceOne";
for( int i=0; i < 10; i++ ) {
CAS cas = eeUimaEngine.getCAS();
//cas.setDocumentText("");
eeUimaEngine.sendAndReceiveCAS(cas, null, tsId);
String serviceIdWhereCasWasProcessed = cas.getDocumentText();
if ( !serviceIdWhereCasWasProcessed.equals(tsId)) {
fail("Received Reply from a Wrong Service - Expected: "+tsId+" Instead Received "+serviceIdWhereCasWasProcessed);
}
cas.release();
}
tsId = "ServiceTwo";
for( int i=0; i < 10; i++ ) {
CAS cas = eeUimaEngine.getCAS();
//cas.setDocumentText("");
eeUimaEngine.sendAndReceiveCAS(cas, null, tsId);
String serviceIdWhereCasWasProcessed = cas.getDocumentText();
if ( !serviceIdWhereCasWasProcessed.equals(tsId)) {
fail("Received Reply from a Wrong Service - Expected: "+tsId+" Instead Received "+serviceIdWhereCasWasProcessed);
}
cas.release();
}
tsId = "ServiceThree";
for( int i=0; i < toProcess; i++ ) {
CAS cas = eeUimaEngine.getCAS();
//cas.setDocumentText("");
eeUimaEngine.sendCAS(cas, tsId);
//cas.release();
}
} catch( Exception e) {
e.printStackTrace();
} finally {
while( getNumberOfCASesProcessed() < toProcess ) {
if ( unexpectedException ) {
fail("Service Targeting Failed");
break;
}
synchronized(this) {
wait(100);
}
}
eeUimaEngine.stop();
}
// synchronized(this) {
// wait(0);
// }
// runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
// "PersonTitleAnnotatorQueue", 0, EXCEPTION_LATCH);
}
@Test
public void testDeployPrimitiveServiceWithInitFailure() throws Exception {
System.out.println("-------------- testDeployPrimitiveServiceWithInitFailure -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
try {
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithInitException.xml");
} catch( ResourceInitializationException e) {
System.out.println("Received Expected ResourceInitializationException On Service Deploy");
return;
}
fail("Expected ResourceInitializationException Not Thrown from deployed Service.");
}
@Test
public void testTypeSystemMerge() throws Exception {
System.out.println("-------------- testTypeSystemMerge -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
deployService(eeUimaEngine, relativePath+ "/Deploy_GovernmentOfficialRecognizer.xml");
deployService(eeUimaEngine, relativePath+ "/Deploy_NamesAndPersonTitlesRecognizer.xml");
deployService(eeUimaEngine, relativePath+ "/Deploy_TokenSentenceRecognizer.xml");
deployService(eeUimaEngine, relativePath+ "/Deploy_AggregateToTestTSMerge.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
try {
initialize(eeUimaEngine, appCtx);
waitUntilInitialized();
// Check if the type system returned from the service contains
// expected types
CAS cas = eeUimaEngine.getCAS();
TypeSystem ts = cas.getTypeSystem();
// "example.EmailsAddress" type was 'contributed' by the Flow Controller
if ( ts.getType("example.EmailAddress") == null ) {
fail("Incomplete Type system. Expected Type 'example.EmailAddress' missing from the CAS type system");
} else if ( ts.getType("example.GovernmentOfficial") == null) {
fail("Incomplete Type system. Expected Type 'example.GovernmentOfficial' missing from the CAS type system");
} else if ( ts.getType("example.Name") == null) {
fail("Incomplete Type system. Expected Type 'example.Name' missing from the CAS type system");
} else if ( ts.getType("example.PersonTitle") == null) {
fail("Incomplete Type system. Expected Type 'example.PersonTitle' missing from the CAS type system");
} else if ( ts.getType("example.PersonTitleKind") == null) {
fail("Incomplete Type system. Expected Type 'example.PersonTitleKind' missing from the CAS type system");
} else if ( ts.getType("org.apache.uima.examples.tokenizer.Sentence") == null) {
fail("Incomplete Type system. Expected Type 'org.apache.uima.examples.tokenizer.Sentence' missing from the CAS type system");
} else if ( ts.getType("org.apache.uima.examples.tokenizer.Token") == null) {
fail("Incomplete Type system. Expected Type 'org.apache.uima.examples.tokenizer.Token' missing from the CAS type system");
}
} catch (ResourceInitializationException e) {
fail("Initialization Exception");
} catch (Exception e) {
} finally {
eeUimaEngine.stop();
}
}
/**
* Tests detection of misconfiguration between deployement descriptor and AE descriptor.
* The AE descriptor is configured to allow one instance of the AE while the deployment
* descriptor attempts to scale out the service containing the AE. ResourceInitializationException
* is expected.
*
* @throws Exception
*/
@Test
public void testMultiInstanceDeployFailureInPrimitiveService() throws Exception {
System.out.println("-------------- testMultiInstanceDeployFailureInPrimitiveService -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
try {
// Deploy Uima-AS Primitive Service
deployService(eeUimaEngine, relativePath + "/Deploy_ScaledPersonTitleAnnotator.xml");
fail("Expected ResourceInitializationException Due to Misconfiguration but instead the service initialized successfully");
} catch ( ResourceInitializationException e) {
// expected
}
}
/**
* Tests processing of an Exception that a service reports on CPC
*
* @throws Exception
*/
@Test
public void testDeployAggregateWithDelegateCpCException() throws Exception {
System.out.println("-------------- testDeployAggregateWithDelegateCpCException -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithCpCException.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml");
addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class);
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
initialize(eeUimaEngine, appCtx);
waitUntilInitialized();
System.out.println("Client Initialized");
CAS cas = eeUimaEngine.getCAS();
for (int i = 0; i < 3; i++) {
eeUimaEngine.sendAndReceiveCAS(cas);
cas.reset();
}
System.out.println("Client Sending CPC");
// Send CPC. The service should recreate a session and send CPC reply
eeUimaEngine.collectionProcessingComplete();
// Now send some CASes and sleep to let the inactivity timer pop again
for (int i = 0; i < 3; i++) {
eeUimaEngine.sendAndReceiveCAS(cas); // This will start a timer on reply queue
cas.reset();
}
// Send another CPC
eeUimaEngine.collectionProcessingComplete();
eeUimaEngine.stop();
}
@Test
public void testDeployPrimitiveServiceWithCpCException() throws Exception {
System.out.println("-------------- testDeployPrimitiveServiceWithCpCException -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithCpCException.xml");
// Add expected exception so that we release CPC Latch
addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class);
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueue", 1, PROCESS_LATCH);
}
/**
* Tests sending CPC request from a client that does not send CASes to a service
*
* @throws Exception
*/
@Test
public void testCpCWithNoCASesSent() throws Exception {
System.out.println("-------------- testCpCWithNoCASesSent -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
deployService(uimaAsEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"PersonTitleAnnotatorQueue");
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
for (int i = 0; i < 10; i++) {
System.out.println("UIMA AS Client Sending CPC Request to a Service");
uimaAsEngine.collectionProcessingComplete();
}
uimaAsEngine.stop();
}
/**
* Tests inactivity timeout on a reply queue. The service stops a session after 5 seconds of
* sending GetMeta reply. The client than waits for 10 seconds and sends a CPC.
*
* @throws Exception
*/
@Test
public void testServiceInactivityTimeoutOnReplyQueue() throws Exception {
System.out.println("-------------- testServiceInactivityTimeoutOnReplyQueue -------------");
String sessionTimeoutOverride = System.getProperty("SessionTimeoutOverride");
System.setProperty("SessionTimeoutOverride", "5000");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueue");
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 1000);
initialize(eeUimaEngine, appCtx);
waitUntilInitialized();
System.out.println("Client Initialized");
CAS cas = eeUimaEngine.getCAS();
eeUimaEngine.sendAndReceiveCAS(cas); // This will start a timer on reply queue
cas.reset();
// Now sleep for 8 seconds to let the service timeout on its reply queue due
// to a 5 second inactivity timeout
Thread.currentThread().sleep(8000);
System.out.println("Client Sending CPC");
// Send CPC. The service should recreate a session and send CPC reply
eeUimaEngine.collectionProcessingComplete();
// Now send some CASes and sleep to let the inactivity timer pop again
for (int i = 0; i < 5; i++) {
eeUimaEngine.sendAndReceiveCAS(cas); // This will start a timer on reply queue
cas.reset();
if (i == 3) {
Thread.currentThread().sleep(8000);
}
}
// Send another CPC
eeUimaEngine.collectionProcessingComplete();
eeUimaEngine.stop();
// Reset inactivity to original value or remove if it was not set
if (sessionTimeoutOverride != null) {
System.setProperty("SessionTimeoutOverride", sessionTimeoutOverride);
} else {
System.clearProperty("SessionTimeoutOverride");
}
}
/**
* Tests handling of ResourceInitializationException that happens in a collocated primitive
*
* @throws Exception
*/
@Test
public void testDeployAggregateServiceWithFailingCollocatedComponent() throws Exception {
System.out
.println("-------------- testDeployAggregateServiceWithFailingCollocatedComponent -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
try {
deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateWithFailingCollocatedDelegate.xml");
} catch (ResourceInitializationException e) {
// This is expected
} catch (Exception e) {
e.printStackTrace();
fail("Expected ResourceInitializationException Instead Caught:" + e.getClass().getName());
}
}
@Test
public void testClientProcessTimeoutWithAggregateMultiplier() throws Exception {
System.out.println("-------------- testClientProcessTimeoutWithAggregateMultiplier -------------");
addExceptionToignore(org.apache.uima.aae.error.UimaASProcessCasTimeout.class);
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(uimaAsEngine, relativePath + "/Deploy_AggregateMultiplierWithDelay.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 3000);
appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
// reduce the cas pool size and reply window
appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(1));
initialize(uimaAsEngine, appCtx);
waitUntilInitialized();
for( int i=0; i < 2; i++ ) {
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText("Some Text");
uimaAsEngine.sendCAS(cas); // will timeout after 5 secs
uimaAsEngine.collectionProcessingComplete(); // the CPC should not
// be sent to a service until the timeout occurs.
}
uimaAsEngine.stop();
}
@Test
public void testDeployAggregateService() throws Exception {
System.out.println("-------------- testDeployAggregateService -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// System.setProperty("BrokerURL", "tcp::/localhost:61616");
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
// Map<String, Object> appCtx = buildContext("tcp://localhost:61616",
"TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 0);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class);
// runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
runTest(appCtx, eeUimaEngine, "tcp://localhost:61616", "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testDeployAggregateServiceWithDoctype() throws Exception {
System.out.println("-------------- testDeployAggregateServiceWithDoctype -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
try {
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorWithDOCTYPE.xml");
} catch( Exception e ) {
if ( e.getMessage() != null &&
e.getMessage().indexOf("disallow-doctype-decl") > 0 ) {
System.out.println("---- Detected expected error during parsing of a deployment descriptor - SUCCESS ----");
return; // success - detected expected error
}
}
fail("This test should have failed with SAXParseException - Instead no error was detected");
}
@Test
public void testAggregateTypePriorities() throws Exception {
System.out.println("-------------- testAggregateTypePriorities -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
// Map<String, Object> appCtx = buildContext("tcp://localhost:61616",
"TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 1000);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
initialize(eeUimaEngine, appCtx);
waitUntilInitialized();
ProcessingResourceMetaData meta = eeUimaEngine.getMetaData();
TypePriorityList[] pl = meta.getTypePriorities().getPriorityLists();
for( TypePriorityList tp : pl ) {
String[] typeArray = tp.getTypes();
Assert.assertEquals(true, typeArray[0].equals("uima.cas.TOP"));
Assert.assertEquals(true, typeArray[1].equals("uima.tcas.Annotation"));
}
eeUimaEngine.stop();
}
/**
* Sends total of 10 CASes to async aggregate configured to process 2 CASes at a time.
* The inner NoOp annotator is configured to sleep for 5 seconds. The client should
* be receiving 2 ACKs simultaneously confirming that the aggregate is processing 2
* input CASes at the same time.
*
* @throws Exception
*/
@Test
public void testDeployAggregateServiceWithScaledInnerNoOp() throws Exception {
System.out.println("-------------- testDeployAggregateServiceWithScaledInnerNoOp -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorWithScaledInnerNoOp.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 0);
appCtx.put(UimaAsynchronousEngine.CasPoolSize, 5);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class);
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
10, PROCESS_LATCH);
}
@Test
public void testDeployAggregateServiceWithDelegateTimeoutAndContinueOnError() throws Exception {
System.out.println("-------------- testDeployAggregateServiceWithDelegateTimeoutAndContinueOnError -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithLongDelay.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorWithDelegateTimeoutAndContinueOnError.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),"TopLevelTaeQueue");
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testScaledSyncAggregateProcess() throws Exception {
System.out.println("-------------- testScaledSyncAggregateProcess -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
deployService(eeUimaEngine, relativePath + "/Deploy_ScaledPrimitiveAggregateAnnotator.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
5, PROCESS_LATCH);
System.out.println(eeUimaEngine.getPerformanceReport());
}
@Test
public void testAggregateWithFailedRemoteDelegate() throws Exception {
System.out.println("-------------- testAggregateWithFailedRemoteDelegate -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithExceptionOn5thCAS.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithFailedRemoteDelegate.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
}
@Test
public void testCMAggregateClientStopRequest() throws Exception {
System.out.println("-------------- testCMAggregateClientStopRequest -------------");
final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath
+ "/Deploy_CMAggregateWithCollocated1MillionDocsCM.xml");
superRef = this;
Thread t = new Thread() {
public void run() {
try {
// Wait for some CASes to return from the service
while (superRef.getNumberOfCASesProcessed() == 0) {
// No reply received yet so wait for 1 second and
// check again
synchronized (this) {
this.wait(1000); // wait for 1 sec
}
}
// The client received at least one reply, wait
// at this point the top level service should show a connection error
synchronized (this) {
// wait for 3 seconds before stopping
this.wait(5000);
}
eeUimaEngine.stopProducingCases();
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
};
t.start();
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testCMAggregateClientStopRequest2() throws Exception {
System.out.println("-------------- testCMAggregateClientStopRequest2 -------------");
final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplierWith1MillionDocs.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplierWith1MillionDocs.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_CMAggregateWithRemote1MillionDocsCM.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_CMAggregateWithRemote1MillionDocsCM.xml");
superRef = this;
Thread t = new Thread() {
public void run() {
try {
// Wait for some CASes to return from the service
while (superRef.getNumberOfCASesProcessed() == 0) {
// No reply received yet so wait for 1 second and
// check again
synchronized (this) {
this.wait(1000); // wait for 1 sec
}
}
// The client received at least one reply, wait
// at this point the top level service should show a connection error
synchronized (this) {
// wait for 3 seconds before stopping
this.wait(3000);
}
eeUimaEngine.stopProducingCases();
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
};
t.start();
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testAggregateCMWithFailedRemoteDelegate() throws Exception {
System.out.println("-------------- testAggregateCMWithFailedRemoteDelegate -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithExceptionOn5thCAS.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateCMWithFailedRemoteDelegate.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
}
@Test
public void testAggregateCMWithFailedCollocatedDelegate() throws Exception {
System.out.println("-------------- testAggregateCMWithFailedCollocatedDelegate -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateCMWithFailedCollocatedDelegate.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
}
@Test
public void testComplexAggregateCMWithFailedCollocatedDelegate() throws Exception {
System.out
.println("-------------- testComplexAggregateCMWithFailedCollocatedDelegate -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath
+ "/Deploy_ComplexAggregateWithFailingInnerAggregateCM.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
}
@Test
public void testAggregateCMWithRemoteCMAndFailedRemoteDelegate() throws Exception {
System.out
.println("-------------- testAggregateCMWithRemoteCMAndFailedRemoteDelegate -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithExceptionOn5thCAS.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplierWith1MillionDocs.xml");
deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateCMWithRemoteCMAndFailedRemoteDelegate.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
}
/**
* Tests a simple Aggregate with one remote Delegate and collocated Cas Multiplier
*
* @throws Exception
*/
@Test
public void testDeployAggregateServiceWithBrokerPlaceholder() throws Exception {
System.out
.println("-------------- testDeployAggregateServiceWithBrokerPlaceholder -------------");
final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
try {
Thread t = new Thread() {
public void run() {
BrokerService bs = null;
try {
// at this point the top level service should show a connection error
synchronized (this) {
this.wait(5000); // wait for 5 secs
}
// Create a new broker that runs a different port that the rest of testcases
bs = setupSecondaryBroker(false);
System.setProperty("AggregateBroker", bs.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
System.setProperty("NoOpBroker", bs.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorUsingPlaceholder.xml");
deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateAnnotatorUsingPlaceholder.xml");
// Start the uima AS client. It connects to the top level service and sends
// 10 messages
runTest(null, eeUimaEngine, System.getProperty("AggregateBroker"), "TopLevelTaeQueue",
1, PROCESS_LATCH);
} catch (InterruptedException e) {
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
if (bs != null) {
try {
bs.stop();
bs.waitUntilStopped();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
};
t.start();
t.join();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Tests missing broker on service startup. The service listener on input queue recovers from this
* by silently attempting reconnect at 5 second intervals. The test first launches the service,
* than spins a thread where a new broker is created after 5 seconds, and finally the uima as
* client is started. The test shows initial connection failure and when the broker becomes
* available the connection is established and messages begin to flow from the client to the
* service and back.
*
* @throws Exception
*/
@Test
public void testDelayedBrokerWithAggregateService() throws Exception {
System.out.println("-------------- testDelayedBrokerWithAggregateService -------------");
final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
BrokerService bs = null;
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
// set up and start secondary broker. It is started only to detect an open port so that we
// define SecondaryBrokerURL property. This property is used to resolve a placeholder
// in the aggregate descriptor. Once the property is set we shutdown the secondary broker to
// test aggregate recovery from missing broker. Hopefully the same port is still open when
// the test starts the secondary broker for the second time.
bs = setupSecondaryBroker(false);
System.setProperty("SecondaryBrokerURL",bs.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
bs.stop();
// wait for the broker to stop
bs.waitUntilStopped();
// Deploy aggregate on a secondary broker which was shutdown above. The aggregate should
// detect missing broker and silently wait for the broker to come up
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorOnSecondaryBroker.xml");
try {
// spin a thread to restart a broker after 5 seconds
Thread t = new Thread() {
public void run() {
BrokerService bs = null;
try {
// at this point the top level service should show a connection error
synchronized (this) {
this.wait(5000); // wait for 5 secs
}
// Create a new broker on port 8119
bs = setupSecondaryBroker(false);
bs.waitUntilStarted();
System.setProperty("SecondaryBrokerURL",bs.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
// Start the uima AS client. It connects to the top level service and sends
// 10 messages
runTest(null, eeUimaEngine, bs.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "TopLevelTaeQueue", 10,
PROCESS_LATCH);
} catch (InterruptedException e) {
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
if (bs != null) {
try {
bs.stop();
bs.waitUntilStopped();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
};
t.start();
t.join();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Tests a simple Aggregate with one remote Delegate and collocated Cas Multiplier
*
* @throws Exception
*/
@Test
public void testDeployAggregateServiceWithTempReplyQueue() throws Exception {
System.out.println("-------------- testDeployAggregateServiceWithTempReplyQueue -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateUsingRemoteTempQueue.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
/**
* Tests a simple Aggregate with one remote Delegate and collocated Cas Multiplier
*
* @throws Exception
*/
@Test
public void testProcessAggregateServiceWith1000Docs() throws Exception {
System.out.println("-------------- testProcessAggregateServiceWith1000Docs -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateAnnotatorWithInternalCM1000Docs.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testProcessAggregateWithInnerAggregateCM() throws Exception {
System.out.println("-------------- testProcessAggregateWithInnerAggregateCM() -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
deployService(eeUimaEngine, relativePath + "/Deploy_ComplexAggregateWithInnerAggregateCM.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testAggregateWithInnerSynchAggregateCM() throws Exception {
System.out.println("-------------- testAggregateWithInnerSynchAggregateCM() -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
deployService(eeUimaEngine, relativePath + "/Deploy_ComplexAggregateWithInnerUimaAggregateCM.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
/**
* Tests exception thrown in the Uima-AS Client when the Collection Reader is added after the uima
* ee client is initialized
*
* @throws Exception
*/
@Test
public void testExceptionOnPostInitializeCollectionReaderInjection() throws Exception {
System.out
.println("-------------- testExceptionOnPostInitializeCollectionReaderInjection -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"PersonTitleAnnotatorQueue");
initialize(eeUimaEngine, appCtx);
waitUntilInitialized();
try {
// Simulate plugging in a Collection Reader. This should throw
// ResourceInitializationException since the client code has
// been already initialized.
eeUimaEngine.setCollectionReader(null);
} catch (ResourceInitializationException e) {
System.out.println("Received Expected Exception:" + ResourceInitializationException.class);
// Expected
return;
} catch (Exception e) {
fail("Invalid Exception Thrown. Expected:" + ResourceInitializationException.class
+ " Received:" + e.getClass());
} finally {
eeUimaEngine.stop();
}
fail("Expected" + ResourceInitializationException.class);
}
/**
* Tests the shutdown due to a failure in the Flow Controller while diabling a delegate
*
* @throws Exception
*/
@Test
public void testTerminateOnFlowControllerExceptionOnDisable() throws Exception {
System.out
.println("-------------- testTerminateOnFlowControllerExceptionOnDisable -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithException.xml");
deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateWithFlowControllerExceptionOnDisable.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
}
/**
* Tests the shutdown due to a failure in the Flow Controller when initializing
*
* @throws Exception
*/
@Test
public void testTerminateOnFlowControllerExceptionOnInitialization() throws Exception {
System.out
.println("-------------- testTerminateOnFlowControllerExceptionOnInitialization -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
String[] containerIds = new String[2];
try {
containerIds[0] = deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
containerIds[1] = deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateWithFlowControllerExceptionOnInitialization.xml");
fail("Expected ResourceInitializationException. Instead, the Aggregate Deployed Successfully");
} catch (ResourceInitializationException e) {
Exception cause = getCause(e);
System.out.println("\nExpected Initialization Exception was received:" + cause);
} catch (Exception e) {
fail("Expected ResourceInitializationException. Instead Got:" + e.getClass());
} finally {
eeUimaEngine.undeploy(containerIds[0]);
eeUimaEngine.undeploy(containerIds[1]);
}
}
/**
* Tests the shutdown due to a failure in the Flow Controller when initializing AND have delegates
* to disable (Jira issue UIMA-1171)
*
* @throws Exception
*/
@Test
public void testTerminateOnFlowControllerExceptionOnInitializationWithDisabledDelegates()
throws Exception {
System.out
.println("-------------- testTerminateOnFlowControllerExceptionOnInitializationWithDisabledDelegates -----");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
String containerId = null;
try {
containerId = deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateWithFlowControllerExceptionOnInitialization.xml");
fail("Expected ResourceInitializationException. Instead, the Aggregate Deployed Successfully");
} catch (ResourceInitializationException e) {
Exception cause = getCause(e);
System.out.println("\nExpected Initialization Exception was received - cause: " + cause);
} catch (Exception e) {
fail("Expected ResourceInitializationException. Instead Got:" + e.getClass());
} finally {
eeUimaEngine.undeploy(containerId);
}
}
/**
* Deploys a Primitive Uima-AS service and sends 5 CASes to it.
*
* @throws Exception
*/
@Test
public void testPrimitiveServiceProcess() throws Exception {
System.out.println("-------------- testPrimitiveServiceProcess -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
deployService(eeUimaEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"PersonTitleAnnotatorQueue", 5, PROCESS_LATCH);
}
/**
* Deploys a Primitive Uima-AS service and sends 5 CASes to it.
*
* @throws Exception
*/
@Test
public void testSyncAggregateProcess() throws Exception {
System.out.println("-------------- testSyncAggregateProcess -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
deployService(eeUimaEngine, relativePath + "/Deploy_MeetingDetectorAggregate.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"MeetingDetectorQueue", 5, PROCESS_LATCH);
}
/**
* Deploys a Primitive Uima-AS service and sends 5 CASes to it.
*
* @throws Exception
*/
@Test
public void testPrimitiveServiceProcessPingFailure() throws Exception {
System.out.println("-------------- testPrimitiveServiceProcessPingFailure -------------");
// Instantiate Uima-AS Client
final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
final String containerID = deployService(eeUimaEngine, relativePath
+ "/Deploy_PersonTitleAnnotator.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"PersonTitleAnnotatorQueue");
// Set an explicit getMeta (Ping)timeout
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 2000);
// Set an explicit process timeout so to test the ping on timeout
appCtx.put(UimaAsynchronousEngine.Timeout, 1000);
// Spin a thread and wait for awhile before killing the remote service.
// This will cause the client to timeout waiting for a CAS reply and
// to send a Ping message to test service availability. The Ping times
// out and causes the client API to stop.
new Thread() {
public void run() {
Object mux = new Object();
synchronized (mux) {
try {
mux.wait(1000);
// Undeploy service container
System.out.println("** About to undeploy PersonTitle service");
eeUimaEngine.undeploy(containerID);
} catch (Exception e) {
}
}
}
}.start();
super.countPingRetries=true;
try {
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"PersonTitleAnnotatorQueue", 1000, EXCEPTION_LATCH);
} catch (RuntimeException e) {
System.out.println(">>> runtest generated exception: " + e);
e.printStackTrace(System.out);
}
super.countPingRetries=false;
// eeUimaEngine.stop();
}
/**
* Tests error handling on delegate timeout. The Delegate is started as remote, the aggregate
* initializes and the client starts sending CASes. After a short while the client kills the
* remote delegate. The aggregate receives a CAS timeout and disables the delegate. A timed out
* CAS is sent to the next delegate in the pipeline. ALL 1000 CASes are returned to the client.
*
* @throws Exception
*/
@Test
public void testDelegateTimeoutAndDisable() throws Exception {
System.out.println("-------------- testDelegateTimeoutAndDisable -------------");
// Instantiate Uima-AS Client
final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
final String containerID = deployService(eeUimaEngine, relativePath
+ "/Deploy_RoomNumberAnnotator.xml");
deployService(eeUimaEngine, relativePath
+ "/Deploy_MeetingDetectorTAE_RemoteRoomNumberDisable.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"MeetingDetectorTaeQueue");
// Set an explicit getMeta (Ping)timeout
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 2000);
// Set an explicit process timeout so to test the ping on timeout
appCtx.put(UimaAsynchronousEngine.Timeout, 1000);
// Spin a thread and wait for awhile before killing the remote service.
// This will cause the client to timeout waiting for a CAS reply and
// to send a Ping message to test service availability. The Ping times
// out and causes the client API to stop.
new Thread() {
public void run() {
Object mux = new Object();
synchronized (mux) {
try {
mux.wait(500);
// Undeploy service container
eeUimaEngine.undeploy(containerID);
} catch (Exception e) {
}
}
}
}.start();
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"MeetingDetectorTaeQueue", 1000, PROCESS_LATCH);
}
/**
* This test kills a remote Delegate while in the middle of processing 1000 CASes. The CAS timeout
* error handling disables the delegate and forces ALL CASes from the Pending Reply List to go
* through Error Handler. The Flow Controller is configured to continueOnError and CASes that
* timed out are allowed to continue to the next delegate. ALL 1000 CASes are accounted for in the
* NoOp Annotator that is last in the flow.
*
* @throws Exception
*/
@Test
public void testDisableDelegateOnTimeoutWithCM() throws Exception {
System.out.println("-------------- testDisableDelegateOnTimeoutWithCM -------------");
// Instantiate Uima-AS Client
final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
final String containerID = deployService(eeUimaEngine, relativePath
+ "/Deploy_RoomNumberAnnotator.xml");
deployService(eeUimaEngine, relativePath
+ "/Deploy_MeetingDetectorTAEWithCM_RemoteRoomNumberDisable.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"MeetingDetectorTaeQueue");
// Set an explicit getMeta (Ping)timeout
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 2000);
// Set an explicit process timeout so to test the ping on timeout
appCtx.put(UimaAsynchronousEngine.Timeout, 1000);
// Spin a thread and wait for awhile before killing the remote service.
// This will cause the client to timeout waiting for a CAS reply and
// to send a Ping message to test service availability. The Ping times
// out and causes the client API to stop.
new Thread() {
public void run() {
Object mux = new Object();
synchronized (mux) {
try {
mux.wait(300);
// Undeploy service container
eeUimaEngine.undeploy(containerID);
} catch (Exception e) {
}
}
}
}.start();
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"MeetingDetectorTaeQueue", 1, PROCESS_LATCH);
}
/**
* Tests Uima-AS client ability to test sendAndReceive in multiple/concurrent threads It spawns 4
* thread each sending 100 CASes to a Primitive Uima-AS service
*
* @throws Exception
*/
@Test
public void testSynchCallProcessWithMultipleThreads() throws Exception {
System.out.println("-------------- testSynchCallProcessWithMultipleThreads -------------");
int howManyCASesPerRunningThread = 100;
int howManyRunningThreads = 4;
runTestWithMultipleThreads(relativePath + "/Deploy_PersonTitleAnnotator.xml",
"PersonTitleAnnotatorQueue", howManyCASesPerRunningThread, howManyRunningThreads, 0, 0);
}
/**
*
* @throws Exception
*/
@Test
public void testPrimitiveProcessCallWithLongDelay() throws Exception {
System.out.println("-------------- testPrimitiveProcessCallWithLongDelay -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithLongDelay.xml");
// We expect 18000ms to be spent in process method
super.setExpectedProcessTime(6000);
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueueLongDelay");
appCtx.remove(UimaAsynchronousEngine.ReplyWindow);
appCtx.put(UimaAsynchronousEngine.ReplyWindow, 1);
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueueLongDelay", 4, PROCESS_LATCH, true);
}
/**
* Tests time spent in process CAS. The CAS is sent to three remote delegates each with a delay of
* 6000ms in the process method. The aggregate is expected to sum up the time spent in each
* annotator process method. The final sum is returned to the client (the test) and compared
* against expected 18000ms. The test actually allows for 20ms margin to account for any overhead
* (garbage collecting, slow cpu, etc)
*
* @throws Exception
*/
@Test
public void testAggregateProcessCallWithLongDelay() throws Exception {
System.out.println("-------------- testAggregateProcessCallWithLongDelay -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Services each with 6000ms delay in process()
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorAWithLongDelay.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorBWithLongDelay.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorCWithLongDelay.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorWithLongDelay.xml");
// We expect 18000ms to be spent in process method
super.setExpectedProcessTime(18000);
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
appCtx.remove(UimaAsynchronousEngine.ReplyWindow);
// make sure we only send 1 CAS at a time
appCtx.put(UimaAsynchronousEngine.ReplyWindow, 1);
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue", 1, PROCESS_LATCH, true);
}
/**
* Tests Aggregate configuration where the Cas Multiplier delegate is the last delegate in the
* Aggregate's pipeline
*
* @throws Exception
*/
@Test
public void testAggregateProcessCallWithLastCM() throws Exception {
System.out.println("-------------- testAggregateProcessCallWithLastCM -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy Uima-AS Primitive Services each with 6000ms delay in process()
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithLastCM.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH, true);
}
/**
* Tests shutdown while running with multiple/concurrent threads The Annotator throws an exception
* and the Aggregate error handling is setup to terminate on the first error.
*
* @throws Exception
*/
@Test
public void testTimeoutInSynchCallProcessWithMultipleThreads() throws Exception {
System.out
.println("-------------- testTimeoutInSynchCallProcessWithMultipleThreads -------------");
int howManyCASesPerRunningThread = 2;
int howManyRunningThreads = 4;
int processTimeout = 2000;
int getMetaTimeout = 500;
runTestWithMultipleThreads(relativePath + "/Deploy_NoOpAnnotatorWithLongDelay.xml",
"NoOpAnnotatorQueueLongDelay", howManyCASesPerRunningThread, howManyRunningThreads,
processTimeout, getMetaTimeout);
}
/**
* Tests shutdown while running with multiple/concurrent threads The Annotator throws an exception
* and the Aggregate error handling is setup to terminate on the first error.
*
* @throws Exception
*/
@Test
public void testTimeoutFailureInSynchCallProcessWithMultipleThreads() throws Exception {
System.out
.println("-------------- testTimeoutFailureInSynchCallProcessWithMultipleThreads -------------");
int howManyCASesPerRunningThread = 1000;
int howManyRunningThreads = 4;
int processTimeout = 2000;
int getMetaTimeout = 500;
runTestWithMultipleThreads(relativePath + "/Deploy_NoOpAnnotator.xml", "NoOpAnnotatorQueue",
howManyCASesPerRunningThread, howManyRunningThreads, 2000, 1000, true);
}
/**
* Tests a parallel flow in the Uima-AS aggregate.
*
* @throws Exception
*/
@Test
public void testProcessWithParallelFlow() throws Exception {
System.out.println("-------------- testProcessWithParallelFlow -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator2.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithParallelFlow.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
/**
* Tests ability to disable one delegate in parallel flow and continue
*
* @throws Exception
*/
@Test
public void testDisableDelegateInParallelFlow() throws Exception {
System.out.println("-------------- testDisableDelegateInParallelFlow -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator2.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithParallelFlow.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
/**
* This test launches four remote delegates and an aggregate. Three of the delegates:
* SlowNoOp1, SlowNoOp2, and NoOp are processing CASes in parallel. SlowNoOp1 and SlowNoOp2
* are configured with a very long delay that exceeds timeout values specified in the aggregate
* deployment descriptor. The SlowNoOp1 times out 3 times and is disabled,
* SlowNoOp2 times out 4 times and is disabled. The test tests ability
* of an aggregate to recover from the timeouts and subsequent disable and carry
* on processing with remaining delegates.
*
* @throws Exception
*/
@Test
public void testMutlipleDelegateTimeoutsInParallelFlows() throws Exception {
System.out.println("-------------- testTimeoutDelegateInParallelFlows -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator2.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_SlowNoOpAnnotator1.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_SlowNoOpAnnotator2.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorWithSlowParallelDelegates.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.Timeout, 30000);
runTest(appCtx, eeUimaEngine, null, null, 1, PROCESS_LATCH);
}
/**
*
* @throws Exception
*/
@Test
public void testTimeoutDelegateInParallelFlows() throws Exception {
System.out.println("-------------- testTimeoutDelegateInParallelFlows -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithDelay.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithLongDelay.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator2.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator3.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithParallelFlows.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
// Set an explicit process timeout so one of the 1st parallels is disabled but 2nd parallel flow
// continues.
appCtx.put(UimaAsynchronousEngine.Timeout, 200000);
runTest(appCtx, eeUimaEngine, null, null, 1, PROCESS_LATCH);
}
/**
* Tests Timeout logic
*
* @throws Exception
*/
@Test
public void testRemoteDelegateTimeout() throws Exception {
System.out.println("-------------- testRemoteDelegateTimeout -------------");
System.out.println("The Aggregate sends 2 CASes to the NoOp Annotator which");
System.out.println("delays each CAS for 6000ms. The timeout is set to 4000ms");
System.out.println("Two CAS retries are expected");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithLongDelay.xml");
deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateAnnotatorWithLongDelayDelegate.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
// The Remote NoOp delays each CAS for 6000ms. The Aggregate sends two CASes so adjust
// client timeout to be just over 12000ms.
appCtx.put(UimaAsynchronousEngine.Timeout, 19000);
runTest(appCtx, eeUimaEngine, null, null, 1, PROCESS_LATCH);
}
/**
* Tests Timeout logic
*
* @throws Exception
*/
@Test
public void testDisableOnRemoteDelegatePingTimeout() throws Exception {
System.out.println("-------------- testDisableOnRemoteDelegatePingTimeout -------------");
System.out.println("The Aggregate sends 2 CASes to the NoOp Annotator which");
System.out.println("delays each CAS for 6000ms. The timeout is set to 4000ms");
System.out.println("Two CAS retries are expected");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
String delegateContainerId = deployService(eeUimaEngine, relativePath
+ "/Deploy_NoOpAnnotatorWithLongDelay.xml");
deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateAnnotatorWithLongDelayDelegate.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
// The Remote NoOp delays each CAS for 6000ms. The Aggregate sends two CASes so adjust
// client timeout to be just over 12000ms.
appCtx.put(UimaAsynchronousEngine.Timeout, 13000);
// Remove container with the remote NoOp delegate so that we can test
// the CAS Process and Ping timeout.
eeUimaEngine.undeploy(delegateContainerId);
// Send the CAS and handle exception
runTest(appCtx, eeUimaEngine, null, null, 1, EXCEPTION_LATCH);
}
@Test
public void testDeployAggregateWithCollocatedAggregateService() throws Exception {
System.out
.println("-------------- testDeployAggregateWithCollocatedAggregateService -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_ComplexAggregate.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
10, PROCESS_LATCH);
}
@Test
public void testProcessWithAggregateUsingCollocatedMultiplier() throws Exception {
System.out
.println("-------------- testProcessWithAggregateUsingCollocatedMultiplier -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testExistanceOfParentCasReferenceIdOnChildFailure() throws Exception {
System.out
.println("-------------- testExistanceOfParentCasReferenceIdOnChildFailure -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithException.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorWithDelegateFailure.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
// When a callback is received to handle the exception on the child CAS, the message should
// contain an CAS id of the parent. If it does the callback handler will set
// receivedExpectedParentReferenceId = true
Assert.assertTrue(super.receivedExpectedParentReferenceId);
}
@Test
public void testProcessWithAggregateUsingRemoteMultiplier() throws Exception {
System.out
.println("-------------- testProcessWithAggregateUsingRemoteMultiplier -------------");
System.setProperty("BrokerURL", getMasterConnectorURI(broker));
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithRemoteMultiplier.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testParentProcessLast() throws Exception {
System.out
.println("-------------- testParentProcessLast -------------");
System.setProperty("BrokerURL", getMasterConnectorURI(broker));
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplierWith10Docs_1.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorWithProcessParentLastCMs.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
/**
* Starts two remote delegates on one broker and a top level client aggregate on
* another. Tests sending Free Cas requests to the appropriate broker.
*
* @throws Exception
*/
@Test
public void testProcessWithAggregateUsingRemoteMultiplierOnSeparateBroker() throws Exception {
System.out
.println("-------------- testProcessWithAggregateUsingRemoteMultiplierOnSeparateBroker -------------");
System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
BrokerService broker2 = setupSecondaryBroker(true);
System.setProperty("BrokerURL", broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithRemoteMultiplier.xml");
Map<String, Object> appCtx = new HashMap();
appCtx.put(UimaAsynchronousEngine.ServerUri, broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
appCtx.put(UimaAsynchronousEngine.ENDPOINT, "TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
runTest(appCtx, eeUimaEngine, broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(),
"TopLevelTaeQueue", 1, PROCESS_LATCH);
super.cleanBroker(broker2);
broker2.stop();
broker2.waitUntilStopped();
}
/**
* First CM feeds 100 CASes to a "merger" CM that generates one output CAS for every 5 input.
* Second CM creates unique document text that is checked by the last component. The default FC
* should let 4 childless CASes through, replacing every 5th by its child.
*
* @throws Exception
*/
@Test
public void testProcessWithAggregateUsingCollocatedMerger() throws Exception {
System.out.println("-------------- testProcessWithAggregateUsingRemoteMerger -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithCollocatedMerger.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testProcessWithAggregateUsingRemoteMerger() throws Exception {
System.out.println("-------------- testProcessWithAggregateUsingRemoteMerger -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMerger.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithRemoteMerger.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testClientWithAggregateMultiplier() throws Exception {
System.out.println("-------------- testClientWithAggregateMultiplier -------------");
System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateMultiplier.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(1));
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue", 1, PROCESS_LATCH);
/*
broker.stop();
broker.waitUntilStopped();
//System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
//broker2 = setupSecondaryBroker(true);
broker = createBroker();
broker.start();
broker.waitUntilStarted();
*/
/*
appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(1));
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TestMultiplierQueue", 1, PROCESS_LATCH);
String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString();
Map<String, Object> appCtx =
buildContext(burl, "TopLevelTaeQueue");
*/
// Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
// "TopLevelTaeQueue");
/*
broker.stop();
broker.waitUntilStopped();
//System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
//broker2 = setupSecondaryBroker(true);
broker = createBroker();
broker.start();
broker.waitUntilStarted();
*/
/*
// reduce the cas pool size and reply window
appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
// runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
runTest(appCtx, eeUimaEngine,burl,
"TopLevelTaeQueue", 1, PROCESS_LATCH);
eeUimaEngine.stop();
*/
}
@Test
public void testClientProcessWithRemoteMultiplier() throws Exception {
System.out.println("-------------- testClientProcessWithRemoteMultiplier -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TestMultiplierQueue");
appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(1));
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TestMultiplierQueue", 1, PROCESS_LATCH);
}
@Test
public void testClientProcessWithComplexAggregateRemoteMultiplier() throws Exception {
System.out
.println("-------------- testClientProcessWithComplexAggregateRemoteMultiplier -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplierWith10Docs_1.xml");
deployService(eeUimaEngine, relativePath
+ "/Deploy_CasMultiplierAggregateWithRemoteCasMultiplier.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testProcessWithAggregateUsing2RemoteMultipliers() throws Exception {
System.out
.println("-------------- testProcessWithAggregateUsing2RemoteMultipliers -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplierWith10Docs_1.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplierWith10Docs_2.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWith2RemoteMultipliers.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testProcessWithAggregateUsing2CollocatedMultipliers() throws Exception {
System.out
.println("-------------- testProcessWithAggregateUsing2CollocatedMultipliers -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWith2Multipliers.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testProcessAggregateWithInnerCMAggregate() throws Exception {
System.out.println("-------------- testProcessAggregateWithInnerCMAggregate -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_TopAggregateWithInnerAggregateCM.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testProcessAggregateWithInnerAggregateDelegateInitFailure() throws Exception {
System.out.println("-------------- testProcessAggregateWithInnerAggregateDelegateInitFailure -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
try {
deployService(eeUimaEngine, relativePath + "/Deploy_TopAggregateWithInnerAggregateDelegateInitFailure.xml");
fail("Expected ResourceInitializationException But The Deployment Succeeded Instead");
} catch( ResourceInitializationException e) {
eeUimaEngine.stop();
}
}
@Test
public void testComplexDeployment() throws Exception {
System.out.println("-------------- testComplexDeployment -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy replicated services for the inner remote aggregate CM
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
// Deploy an instance of a remote aggregate CM containing a collocated Cas Multiplier
// CM --> Replicated Remote Primitive --> NoOp CC
deployService(eeUimaEngine, relativePath + "/Deploy_CMAggregateWithCollocatedCM.xml");
// Deploy top level Aggregate Cas Multiplier with 2 collocated Cas Multipliers
// CM1 --> CM2 --> Remote AggregateCM --> Candidate Answer --> CC
deployService(eeUimaEngine, relativePath + "/Deploy_TopLevelComplexAggregateCM.xml");
runTest2(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue", 10, PROCESS_LATCH);
}
@Test
public void testTypesystemMergeWithMultiplier() throws Exception {
System.out.println("-------------- testTypesystemMergeWithMultiplier -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithMergedTypes.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testStopAggregateWithRemoteMultiplier() throws Exception {
System.out.println("-------------- testStopAggregateWithRemoteMultiplier -------------");
System.setProperty("BrokerURL", getMasterConnectorURI(broker));
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithExceptionOn5thCAS.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithRemoteMultiplier.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
}
@Test
public void testCancelProcessAggregateWithCollocatedMultiplier() throws Exception {
System.out
.println("-------------- testCancelProcessAggregateWithCollocatedMultiplier -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_ComplexAggregateWith1MillionDocs.xml");
// Spin a thread to cancel Process after 20 seconds
spinShutdownThread(eeUimaEngine, 20000);
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);
}
@Test
public void testCancelProcessAggregateWithRemoteMultiplier() throws Exception {
System.out.println("-------------- testStopAggregateWithRemoteMultiplier -------------");
System.setProperty("BrokerURL", getMasterConnectorURI(broker));
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplierWith1MillionDocs.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithRemoteMultiplier.xml");
// Spin a thread to cancel Process after 20 seconds
spinShutdownThread(eeUimaEngine, 20000);
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH);// EXCEPTION_LATCH);
}
/**
* Test correct reply from the service when its process method fails. Deploys the Primitive
* Service ( NoOp Annotator) that is configured to throw an exception on every CAS. The expected
* behavior is for the Primitive Service to return a reply with an Exception. This code blocks on
* a Count Down Latch, until the exception is returned from the service. When the exception is
* received the latch is opened indicating success.
*
* @throws Exception
*/
@Test
public void testPrimitiveServiceResponseOnException() throws Exception {
System.out.println("-------------- testPrimitiveServiceResponseOnException -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy remote service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithException.xml");
// Deploy Uima-AS Primitive Service
// Initialize and run the Test. Wait for a completion and cleanup resources.
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueue", 1, EXCEPTION_LATCH);
}
@Test
public void testProcessParallelFlowWithDelegateFailure() throws Exception {
System.out.println("-------------- testProcessParallelFlowWithDelegateFailure -------------");
// Create Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy remote service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithException.xml");
// Deploy remote service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator2.xml");
// Deploy top level aggregate service
deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateWithParallelFlowTerminateOnDelegateFailure.xml");
// Initialize and run the Test. Wait for a completion and cleanup resources.
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH); // PC_LATCH);
}
/**
* Tests that the thresholdAction is taken when thresholdCount errors occur in the last
* thresholdWindow CASes. Aggregate has two annotators, first fails with increasing frequency (on
* CASes 10 19 27 34 40 45 49 52 54) and is disabled after 3 errors in a window of 7 (49,52,54)
* Second annotator counts the CASes that reach it and verifies that it sees all but the 9
* failures. It throws an exception if the first is disabled after too many or too few errors.
*
* @throws Exception
*/
@Test
public void testErrorThresholdWindow() throws Exception {
System.out.println("-------------- testErrorThresholdWindow -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorWithThresholdWindow.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
// Set an explicit CPC timeout as exceptions thrown in the 2nd annotator's CPC don't reach the
// client.
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 20000);
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue", 1, PROCESS_LATCH); // PC_LATCH);
}
@Test
public void testProcessParallelFlowWithDelegateDisable() throws Exception {
System.out.println("-------------- testProcessParallelFlowWithDelegateDisable -------------");
// Create Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithException.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_SimpleAnnotator.xml");
deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateWithParallelFlowDisableOnDelegateFailure.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, PROCESS_LATCH); // PC_LATCH);
}
@Test
public void testPrimitiveShutdownOnTooManyErrors() throws Exception {
System.out.println("-------------- testPrimitiveShutdownOnTooManyErrors -------------");
// Create Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Deploy remote service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithException.xml");
// Deploy top level aggregate service
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml");
// Initialize and run the Test. Wait for a completion and cleanup resources.
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
}
/**
* Tests exception thrown in the Uima-AS Client when the Collection Reader is added after the uima
* ee client is initialized
*
* @throws Exception
*/
@Test
public void testCollectionReader() throws Exception {
System.out.println("-------------- testCollectionReader -------------");
// Instantiate Uima-AS Client
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_PersonTitleAnnotator.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"PersonTitleAnnotatorQueue");
// reduce the cas pool size and reply window
appCtx.remove(UimaAsynchronousEngine.CasPoolSize);
appCtx.put(UimaAsynchronousEngine.CasPoolSize, Integer.valueOf(2));
appCtx.remove(UimaAsynchronousEngine.ReplyWindow);
appCtx.put(UimaAsynchronousEngine.ReplyWindow, 1);
// set the collection reader
String filename = super
.getFilepathFromClassloader("descriptors/collection_reader/ExtendedTestFileSystemCollectionReader.xml");
if (filename == null) {
fail("Unable to find file:" + "descriptors/collection_reader/ExtendedTestFileSystemCollectionReader.xml"
+ "in classloader");
}
File collectionReaderDescriptor = new File(filename);
CollectionReaderDescription collectionReaderDescription = UIMAFramework.getXMLParser()
.parseCollectionReaderDescription(new XMLInputSource(collectionReaderDescriptor));
CollectionReader collectionReader = UIMAFramework
.produceCollectionReader(collectionReaderDescription);
eeUimaEngine.setCollectionReader(collectionReader);
initialize(eeUimaEngine, appCtx);
waitUntilInitialized();
runCrTest(eeUimaEngine, 7);
synchronized (this) {
wait(50);
}
eeUimaEngine.stop();
}
@Test
public void testAsynchronousTerminate() throws Exception {
System.out.println("-------------- testAsynchronousTerminate -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
try {
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithDelay.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator2.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithParallelFlow.xml");
initialize(eeUimaEngine, appCtx);
// Wait until the top level service returns its metadata
waitUntilInitialized();
} catch( Exception e) {
throw e;
}
CAS cas = eeUimaEngine.getCAS();
System.out.println(" Sending CAS to kick off aggregate w/colocated CasMultiplier");
eeUimaEngine.sendCAS(cas);
System.out.println(" Waiting 1 seconds");
Thread.sleep(1000);
System.out.println(" Trying to stop service");
eeUimaEngine.stop();
System.out.println(" stop() returned!");
}
@Test
public void testCallbackListenerOnFailure() throws Exception {
System.out.println("-------------- testCallbackListenerOnFailure -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithException.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueue");
// Register special callback listener. This listener will receive
// an exception with the Cas Reference id.
TestListener listener = new TestListener(this);
eeUimaEngine.addStatusCallbackListener(listener);
initialize(eeUimaEngine, appCtx);
// Wait until the top level service returns its metadata
waitUntilInitialized();
CAS cas = eeUimaEngine.getCAS();
// Send request out and save Cas Reference id
String casReferenceId = eeUimaEngine.sendCAS(cas);
// Spin a callback listener thread
Thread t = new Thread(listener);
t.start();
// Wait for reply CAS. This method blocks
String cRefId = listener.getCasReferenceId();
try {
// Test if received Cas Reference Id matches the id of the CAS sent out
if (!cRefId.equals(casReferenceId)) {
fail("Received Invalid Cas Reference Id. Expected:" + casReferenceId + " Received: "
+ cRefId);
} else {
System.out.println("Received Expected Cas Identifier:" + casReferenceId);
}
} finally {
// Stop callback listener thread
listener.doStop();
eeUimaEngine.stop();
}
}
@Test
public void testCauseOfInitializationFailure() throws Exception {
System.out.println("-------------- testCauseOfInitializationFailure -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
try {
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithInitException.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"NoOpAnnotatorQueue");
exceptionCountLatch = new CountDownLatch(1);
initialize(eeUimaEngine, appCtx);
fail("Expected ResourceInitializationException. Instead, the Aggregate Reports Successfull Initialization");
} catch (ResourceInitializationException e) {
Exception cause = getCause(e);
if ( cause != null && (cause instanceof FileNotFoundException) ) {
System.out.println("Expected FileNotFoundException was received");
} else {
fail("Expected FileNotFoundException NOT received as a cause of failure. Instead Got:" + cause);
}
} catch (Exception e) {
fail("Expected ResourceInitializationException. Instead Got:" + e.getClass());
} finally {
eeUimaEngine.stop();
}
}
@Test
public void testTerminateOnInitializationFailure() throws Exception {
System.out.println("-------------- testTerminateOnInitializationFailure -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
try {
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithParallelFlow.xml");
Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue");
exceptionCountLatch = new CountDownLatch(1);
initialize(eeUimaEngine, appCtx);
fail("Expected ResourceInitializationException. Instead, the Aggregate Reports Successfull Initialization");
} catch (ResourceInitializationException e) {
Exception cause = getCause(e);
System.out.println("Expected Initialization Exception was received:" + cause);
} catch (Exception e) {
fail("Expected ResourceInitializationException. Instead Got:" + e.getClass());
} finally {
eeUimaEngine.stop();
}
}
/**
* Tests shutdown due to delegate broker missing. The Aggregate is configured to retry getMeta 3
* times and continue. The client times out after 20 seconds and forces the shutdown. NOTE: The
* Spring listener tries to recover JMS connection on failure. In this test a Listener to remote
* delegate cannot be established due to a missing broker. The Listener is setup to retry every 60
* seconds. After failure, the listener goes to sleep for 60 seconds and tries again. This results
* in a 60 second delay at the end of this test.
*
* @throws Exception
*/
@Test
public void testTerminateOnInitializationFailureWithDelegateBrokerMissing() throws Exception {
System.out
.println("-------------- testTerminateOnInitializationFailureWithDelegateBrokerMissing -------------");
System.out
.println("---------------------- The Uima Client Times Out After 20 seconds --------------------------");
System.out
.println("-- The test requires 1 minute to complete due to 60 second delay in Spring Listener ----");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
try {
// Deploy remote service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
// Deploy top level aggregate that communicates with the remote via Http Tunnelling
deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateAnnotatorTerminateOnDelegateBadBrokerURL.xml");
// Initialize and run the Test. Wait for a completion and cleanup resources.
Map<String, Object> appCtx = new HashMap();
appCtx.put(UimaAsynchronousEngine.ServerUri, String.valueOf(getMasterConnectorURI(broker)));
appCtx.put(UimaAsynchronousEngine.ENDPOINT, "TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue", 1, EXCEPTION_LATCH);
fail("Expected ResourceInitializationException. Instead, the Aggregate Reports Successfull Initialization");
} catch (ResourceInitializationException e) {
Exception cause = getCause(e);
System.out.println("Expected Initialization Exception was received:" + cause);
} catch (Exception e) {
fail("Expected ResourceInitializationException. Instead Got:" + e.getClass());
} finally {
eeUimaEngine.stop();
}
}
/**
* Tests shutdown due to delegate broker missing. The Aggregate is configured to terminate on
* getMeta timeout.
*
* @throws Exception
*/
@Test
public void testTerminateOnInitializationFailureWithAggregateForcedShutdown() throws Exception {
System.out
.println("-------------- testTerminateOnInitializationFailureWithAggregateForcedShutdown -------------");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
// Initialize and run the Test. Wait for a completion and cleanup resources.
try {
// Deploy remote service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
// Deploy top level aggregate that communicates with the remote via Http Tunnelling
deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateAnnotatorWithHttpDelegateNoRetries.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue", 10, EXCEPTION_LATCH);
fail("Expected ResourceInitializationException. Instead, the Aggregate Reports Successfull Initialization");
} catch (ResourceInitializationException e) {
Exception cause = getCause(e);
System.out.println("Expected Initialization Exception was received:" + cause);
} catch (Exception e) {
fail("Expected ResourceInitializationException. Instead Got:" + e.getClass());
} finally {
eeUimaEngine.stop();
}
}
/**
* Tests shutdown due to delegate broker missing. The Aggregate is configured to retry getMeta 3
* times and continue. The client times out after 20 seconds and forces the shutdown. NOTE: The
* Spring listener tries to recover JMS connection on failure. In this test a Listener to remote
* delegate cannot be established due to a missing broker. The Listener is setup to retry every 60
* seconds. After failure, the listener goes to sleep for 60 seconds and tries again. This results
* in a 60 second delay at the end of this test.
*
* @throws Exception
*/
@Test
public void testDisableOnInitializationFailureWithDelegateBrokerMissing() throws Exception {
System.out
.println("-------------- testDisableOnInitializationFailureWithDelegateBrokerMissing() -------------");
System.out
.println("---------------------- The Uima Client Times Out After 20 seconds --------------------------");
System.out
.println("-- The test requires 1 minute to complete due to 60 second delay in Spring Listener ----");
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
try {
// Deploy remote service
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
// Deploy top level aggregate that communicates with the remote via Http Tunnelling
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorWithHttpDelegate.xml");
// Initialize and run the Test. Wait for a completion and cleanup resources.
Map<String, Object> appCtx = new HashMap();
appCtx.put(UimaAsynchronousEngine.ServerUri, String.valueOf(getMasterConnectorURI(broker)));
appCtx.put(UimaAsynchronousEngine.ENDPOINT, "TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue", 1, PROCESS_LATCH);
} catch (Exception e) {
fail("Expected Success. Instead Received Exception:" + e.getClass());
} finally {
eeUimaEngine.stop();
}
}
/**
* This tests some of the error handling. Each annotator writes a file and throws an exception.
* After the CAS is processed the presence/absence of certain files indicates success or failure.
* The first annotator fails and lets the CAS proceed, so should write only one file. The second
* annotator fails and is retried 2 times, and doesn't let the CAS proceed, so should write 3
* files. The third annotator should not see the CAS, so should not write any files
*
* @throws Exception
*/
@Test
public void testContinueOnRetryFailure() throws Exception {
System.out.println("-------------- testContinueOnRetryFailure -------------");
File tempDir = new File("target/temp");
deleteAllFiles(tempDir);
try {
tempDir.mkdir();
} catch( Exception e) {
e.printStackTrace();
throw e;
}
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
deployService(eeUimaEngine, relativePath + "/Deploy_WriterAnnotatorA.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_WriterAnnotatorB.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithContinueOnRetryFailures.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
1, EXCEPTION_LATCH);
if (!(new File(tempDir, "WriterAnnotatorB.3")).exists()
|| (new File(tempDir, "WriterAnnotatorB.4")).exists()) {
fail("Second annotator should have run 3 times");
}
if ((new File(tempDir, "WriterAnnotatorC.1")).exists()) {
fail("Third annotator should not have seen CAS");
}
}
// public void testCatchExtraThreads() throws Exception {
// Thread.sleep(24 * 60 * 60 * 1000); // sleep for one day
// }
@Test
public void testDeployAgainAndAgain() throws Exception {
System.out.println("-------------- testDeployAgainAndAgain -------------");
// in the
// loop,
// no
// change.
for (int num = 1; num <= 50; num++) {
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); // here or
System.out.println("\nRunning iteration " + num);
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator2.xml");
deployService(eeUimaEngine, relativePath + "/Deploy_AggregateWithParallelFlow.xml");
runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)),
"TopLevelTaeQueue", 1, PROCESS_LATCH);
}
}
public void testMultipleASClients() throws Exception {
System.out.println("-------------- testMultipleSyncClientsWithMultipleBrokers -------------");
class RunnableClient implements Runnable {
String brokerURL;
BaseTestSupport testSupport;
BaseUIMAAsynchronousEngine_impl uimaAsEngine;
String serviceEndpoint;
RunnableClient(BaseTestSupport testSupport, String brokerURL,String serviceEndpoint) {
this.brokerURL = brokerURL;
this.testSupport = testSupport;
this.serviceEndpoint = serviceEndpoint;
uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
}
public BaseUIMAAsynchronousEngine_impl getUimaAsClient() {
return uimaAsEngine;
}
public void initialize() throws Exception {
@SuppressWarnings("unchecked")
Map<String, Object> appCtx = buildContext(brokerURL, serviceEndpoint);
appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
testSupport.initialize(getUimaAsClient(), appCtx);
waitUntilInitialized();
}
public void run() {
try {
initialize();
System.out.println("Thread:"+Thread.currentThread().getId()+" Completed GetMeta() broker:"+brokerURL);
} catch( Exception e) {
e.printStackTrace();
} finally {
try {
uimaAsEngine.stop();
} catch( Exception e) {
e.printStackTrace();
}
}
}
}
ExecutorService executor = Executors.newCachedThreadPool();
String serviceId1;
String serviceId2;
// change broker URl in system properties
System.setProperty("BrokerURL", getMasterConnectorURI(broker).toString());
RunnableClient client1 =
new RunnableClient(this, getMasterConnectorURI(broker), "NoOpAnnotatorQueue");
BaseUIMAAsynchronousEngine_impl engine = client1.getUimaAsClient();
serviceId1 = deployService(engine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
final BrokerService broker2 = setupSecondaryBroker(true);
// change broker URl in system properties
System.setProperty("BrokerURL", broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
RunnableClient client2 =
new RunnableClient(this, "failover:tcp://f5n633:51514,tcp://f12n1133:51514","NoOpAnnotatorQueue");//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
//new RunnableClient(this, "failover:ssl://f5n6:51514,ssl://f12n11:51514","NoOpAnnotatorQueue");//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue");
BaseUIMAAsynchronousEngine_impl engine2 = client2.getUimaAsClient();
// serviceId2 = deployService(engine2, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml");
for( int x = 0; x < 100; x++) {
List<Future<?>> list1 = new ArrayList<Future<?>>();
List<Future<?>> list2 = new ArrayList<Future<?>>();
String b;
/*
if ( x % 2 == 0 ) {
b = getMasterConnectorURI(broker);
} else {
b = "failover:ssl://f5n6:51514,ssl://f12n11:51514";//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString();
}
*/
List<Future<?>> list = new ArrayList<Future<?>>();
for (int i = 0; i < 20; i++) {
if ( i % 2 == 0 ) {
b = getMasterConnectorURI(broker);
list = list1;
} else {
b = "failover:tcp://f5n633:51514,tcp://f12n1133:51514?maxReconnectAttempts=2&timeout=300&transport.maxReconnectAttempts=2&transport.timeout=300&startupMaxReconnectAttempts=1";//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString();
// b = "failover:ssl://f5n6:51514,ssl://f12n11:51514?maxReconnectAttempts=2&timeout=300&transport.maxReconnectAttempts=2&transport.timeout=300&startupMaxReconnectAttempts=1";//broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString();
list = list2;
}
RunnableClient client =
new RunnableClient(this, b, "NoOpAnnotatorQueue");
list.add(executor.submit(client));;
}
/*
for (int i = 0; i < 10; i++) {
try {
list.get(i).get();//1, TimeUnit.SECONDS);
} catch( Exception e) {
e.printStackTrace();
list.get(i).cancel(true);
}
}
*/
Worker worker1 = new Worker(list1);
Worker worker2 = new Worker(list1);
Thread t1 = new Thread(worker1);
Thread t2 = new Thread(worker2);
t1.start();
t2.start();
t1.join();
t2.join();
list.clear();
}
// engine2.undeploy(serviceId2);
engine.undeploy(serviceId1);
//engine2.stop();
executor.shutdownNow();
while( !executor.isShutdown() ) {
synchronized(broker) {
broker.wait(100);
}
}
broker2.stop();
broker2.waitUntilStopped();
//broker.stop();
//broker.waitUntilStopped();
//System.out.println("Done");
}
private Exception getCause(Throwable e) {
Exception cause = (Exception) e;
while (cause.getCause() != null) {
cause = (Exception) cause.getCause();
}
return cause;
}
/**
* This tests GetMeta retries. It deploys a simple Aggregate service that contains a collocated
* Primitive service and a Primitive remote. The Primitive remote is simulated in this code. The
* code starts a listener where the Aggregate sends GetMeta requests. The listener responds to the
* Aggregate with its metadata only when an expected number of GetMeta retries is met. If the
* Aggregate fails to send expected number of GetMeta requests, the listener will not adjust its
* CountDownLatch and will cause this test to hang.
*
* @throws Exception
*/
public void getMetaRetry() throws Exception {
getMetaCountLatch = new CountDownLatch(MaxGetMetaRetryCount);
Connection connection = getConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = (ActiveMQDestination) session
.createQueue(primitiveServiceQueue1);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message aMessage) {
try {
if (isMetaRequest(aMessage)) {
// Reply with metadata when retry count reaches defined threshold
if (getMetaRequestCount > 0 && getMetaRequestCount % MaxGetMetaRetryCount == 0) {
JmsMessageContext msgContext = new JmsMessageContext(aMessage, primitiveServiceQueue1);
JmsOutputChannel outputChannel = new JmsOutputChannel();
outputChannel.setServiceInputEndpoint(primitiveServiceQueue1);
outputChannel.setServerURI(getBrokerUri());
Endpoint endpoint = msgContext.getEndpoint();
outputChannel.sendReply(getPrimitiveMetadata1(PrimitiveDescriptor1), endpoint, true);
}
getMetaRequestCount++;
getMetaCountLatch.countDown(); // Count down to unblock the thread
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
consumer.start();
BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl();
String containerId = deployService(eeUimaEngine, relativePath
+ "/Deploy_AggregateAnnotator.xml");
Map<String, Object> appCtx = new HashMap();
appCtx.put(UimaAsynchronousEngine.ServerUri, String.valueOf(getMasterConnectorURI(broker)));
appCtx.put(UimaAsynchronousEngine.ENDPOINT, "TopLevelTaeQueue");
appCtx.put(UimaAsynchronousEngine.CasPoolSize, Integer.valueOf(4));
appCtx.put(UimaAsynchronousEngine.ReplyWindow, 15);
appCtx.put(UimaAsynchronousEngine.Timeout, 0);
initialize(eeUimaEngine, appCtx);
System.out
.println("TestBroker.testGetMetaRetry()-Blocking On GetMeta Latch. Awaiting GetMeta Requests");
/*********************************************************************************/
/**** This Code Will Block Until Expected Number Of GetMeta Requests Arrive ******/
getMetaCountLatch.await();
/*********************************************************************************/
consumer.stop();
connection.stop();
eeUimaEngine.undeploy(containerId);
eeUimaEngine.stop();
}
public ProcessingResourceMetaData getPrimitiveMetadata1(String aDescriptor) throws Exception {
ResourceSpecifier resourceSpecifier = UimaClassFactory.produceResourceSpecifier(aDescriptor);
return ((AnalysisEngineDescription) resourceSpecifier).getAnalysisEngineMetaData();
}
private static boolean deleteAllFiles(File directory) {
if (directory.isDirectory()) {
String[] files = directory.list();
for (int i = 0; i < files.length; i++) {
deleteAllFiles(new File(directory, files[i]));
}
}
// Now have an empty directory or simple file
return directory.delete();
}
private static class TestListener extends UimaAsBaseCallbackListener implements Runnable {
private String casReferenceId = null;
private Object monitor = new Object();
public TestListener(TestUimaASExtended aTester) {
}
public void collectionProcessComplete(EntityProcessStatus arg0) {
// TODO Auto-generated method stub
}
public void onBeforeMessageSend(UimaASProcessStatus status) {
System.out.println("TestListener Received onBeforeMessageSend Notification with Cas:"
+ status.getCasReferenceId());
}
public void entityProcessComplete(CAS aCAS, EntityProcessStatus aProcessStatus) {
if (aProcessStatus.isException()) {
if (aProcessStatus instanceof UimaASProcessStatus) {
casReferenceId = ((UimaASProcessStatus) aProcessStatus).getCasReferenceId();
if (casReferenceId != null) {
synchronized (monitor) {
monitor.notifyAll();
}
}
}
}
}
public void initializationComplete(EntityProcessStatus arg0) {
// TODO Auto-generated method stub
}
public String getCasReferenceId() {
synchronized (monitor) {
while (casReferenceId == null) {
try {
monitor.wait();
} catch (InterruptedException e) {
}
}
}
return casReferenceId;
}
public void doStop() {
}
public void run() {
System.out.println("Stopping TestListener Callback Listener Thread");
}
}
private class Worker implements Runnable {
List<Future<?>> list = new ArrayList<Future<?>>();
public Worker(List<Future<?>> list ) {
this.list = list;
}
@Override
public void run() {
for (int i = 0; i < list.size(); i++) {
try {
list.get(i).get();//1, TimeUnit.SECONDS);
} catch( Exception e) {
e.printStackTrace();
list.get(i).cancel(true);
}
}
}
}
class StartBrokerTask extends TimerTask {
BrokerService broker = null;
TestUimaASExtended testSuiteRef = null;
StartBrokerTask(BrokerService broker, TestUimaASExtended testSuiteRef) {
this.broker = broker;
this.testSuiteRef = testSuiteRef;
}
@Override
public void run() {
try {
broker = testSuiteRef.setupSecondaryBroker(true);
broker.waitUntilStarted();
System.out.println("..... Restarted broker ............................");
} catch( Exception e ) {
throw new RuntimeException(e);
}
}
}
}