blob: d109b0a799b97562d0535e9413a3b23ff081b446 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.test.apps.runtime;
import static org.junit.Assert.assertTrue;
import java.util.Hashtable;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.edgent.apps.runtime.JobMonitorApp;
import org.apache.edgent.execution.DirectSubmitter;
import org.apache.edgent.execution.Job;
import org.apache.edgent.execution.services.ControlService;
import org.apache.edgent.execution.services.ServiceContainer;
import org.apache.edgent.providers.direct.DirectProvider;
import org.apache.edgent.runtime.appservice.AppService;
import org.apache.edgent.runtime.jmxcontrol.JMXControlService;
import org.apache.edgent.runtime.jobregistry.JobRegistry;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
import org.apache.edgent.topology.services.ApplicationService;
import org.junit.Test;
public class JobMonitorAppTest {
public final static String MONITORED_APP_NAME_1 = "MonitoredApp_1";
public final static String MONITORED_APP_NAME_2 = "MonitoredApp_2";
@Test
public void testJobMonitorApp() throws Exception {
DirectProvider provider = new DirectProvider();
startProvider(provider);
// Start monitor app
JobMonitorApp app = new JobMonitorApp(provider, provider, JobMonitorApp.APP_NAME);
Job monitor = app.submit();
// Declare and register user apps which need monitoring
AtomicInteger appOneBuildCnt = new AtomicInteger();
AtomicInteger appOneInjectedErrorCnt = new AtomicInteger();
AtomicInteger appTwoBuildCnt = new AtomicInteger();
AtomicInteger appTwoInjectedErrorCnt = new AtomicInteger();
registerMonitoredApplicationOne(provider, appOneBuildCnt, appOneInjectedErrorCnt);
registerMonitoredApplicationTwo(provider, appTwoBuildCnt, appTwoInjectedErrorCnt);
// Start monitored apps
startMonitoredApplications(provider);
// Run for a while, assert the monitor app is still running healthy
Thread.sleep(5000);
assertTrue(
Job.State.RUNNING.equals(monitor.getCurrentState()) &&
Job.State.RUNNING.equals(monitor.getNextState()) &&
Job.Health.HEALTHY.equals(monitor.getHealth()));
// Verify app restarts.
// Ideally, each app should be started (rebuilt) exactly
// once initially + once for each injected failure.
// Given the timing vagueness of this test allow for
// a little wiggle room - allow one fewer than expected.
int appOneExpectedBuildCnt = 1 + appOneInjectedErrorCnt.get();
int appTwoExpectedBuildCnt = 1 + appTwoInjectedErrorCnt.get();
int appOneActBuildCnt = appOneBuildCnt.get();
int appTwoActBuildCnt = appTwoBuildCnt.get();
System.out.println("appOne: actBuildCnt: " + appOneActBuildCnt + " expBuildCnt: "+ appOneExpectedBuildCnt);
System.out.println("appTwo: actBuildCnt: " + appTwoActBuildCnt + " expBuildCnt: "+ appTwoExpectedBuildCnt);
assertTrue("appOne",
appOneActBuildCnt > 1
&& appOneActBuildCnt >= appOneExpectedBuildCnt - 1
&& appOneActBuildCnt <= appOneExpectedBuildCnt);
assertTrue("appTwo",
appTwoActBuildCnt > 1
&& appTwoActBuildCnt >= appTwoExpectedBuildCnt - 1
&& appTwoActBuildCnt <= appTwoExpectedBuildCnt);
}
static void startProvider(DirectProvider provider)
throws InterruptedException, ExecutionException {
provider.getServices().addService(ControlService.class,
new JMXControlService("org.apache.edgent.test.apps.runtime", new Hashtable<>()));
AppService.createAndRegister(provider, provider);
JobRegistry.createAndRegister(provider.getServices());
}
/**
* Fails every 2 seconds (20 tuples * 100 millis)
*/
static void registerMonitoredApplicationOne(DirectSubmitter<Topology, Job> submitter, AtomicInteger topoBuiltCnt, AtomicInteger injectedErrorCnt) {
ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
appService.registerTopology(MONITORED_APP_NAME_1, (topology, config) -> {
topoBuiltCnt.incrementAndGet();
Random r = new Random();
TStream<Double> d = topology.poll(() -> r.nextGaussian(), 100, TimeUnit.MILLISECONDS);
final AtomicInteger count = new AtomicInteger(0);
d = d.filter(tuple -> {
int tupleCount = count.incrementAndGet();
if (tupleCount == 20) {
injectedErrorCnt.incrementAndGet();
throw new IllegalStateException(MONITORED_APP_NAME_1 + " Injected error " + injectedErrorCnt.get());
}
return true;
});
d.sink(tuple -> System.out.print("."));
});
}
/**
* Fails every 1.5 seconds (10 tuples * 150 millis)
*/
static void registerMonitoredApplicationTwo(DirectSubmitter<Topology, Job> submitter, AtomicInteger topoBuiltCnt, AtomicInteger injectedErrorCnt) {
ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
appService.registerTopology(MONITORED_APP_NAME_2, (topology, config) -> {
topoBuiltCnt.incrementAndGet();
Random r = new Random();
TStream<Double> d = topology.poll(() -> r.nextGaussian(), 150, TimeUnit.MILLISECONDS);
final AtomicInteger count = new AtomicInteger(0);
d = d.filter(tuple -> {
int tupleCount = count.incrementAndGet();
if (tupleCount == 10) {
injectedErrorCnt.incrementAndGet();
throw new IllegalStateException(MONITORED_APP_NAME_2 + " Injected error " + injectedErrorCnt.get());
}
return true;
});
d.sink(tuple -> System.out.print("#"));
});
}
static void startMonitoredApplications(DirectSubmitter<Topology, Job> submitter) {
ServiceContainer services = submitter.getServices();
ApplicationService appService = services.getService(ApplicationService.class);
ControlService controlService = services.getService(ControlService.class);
// Submit all applications registered with the ApplicationService
for (String name: appService.getApplicationNames()) {
JobMonitorApp.submitApplication(name, controlService);
}
}
}