blob: 14f9704edfb52142742687524fcfb70362ca8369 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.test.providers.direct;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.edgent.execution.Configs;
import org.apache.edgent.execution.Job;
import org.apache.edgent.graph.Vertex;
import org.apache.edgent.oplet.Oplet;
import org.apache.edgent.oplet.core.PeriodicSource;
import org.apache.edgent.oplet.core.Pipe;
import org.apache.edgent.providers.direct.DirectProvider;
import org.apache.edgent.test.topology.TopologyAbstractTest;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
import org.junit.Test;
import com.google.gson.JsonObject;
public class DirectJobTest extends TopologyAbstractTest implements DirectTestSetup {
@Test
public void jobName0() throws Exception {
String[] data = new String[] {};
String topologyName = "topoName";
Topology t = newTopology(topologyName);
t.strings(data);
Job job = awaitCompleteExecution(t);
assertTrue(job.getName().startsWith(topologyName));
}
@Test
public void jobName1() throws Exception {
String[] data = new String[] {};
String topologyName = "topoName";
Topology t = newTopology(topologyName);
t.strings(data);
JsonObject config = new JsonObject();
config.addProperty(Configs.JOB_NAME, (String)null);
Job job = awaitCompleteExecution(t, config);
assertTrue(job.getName().startsWith(topologyName));
}
@Test
public void jobName2() throws Exception {
String[] data = new String[] {};
String jobName = "testJob";
Topology t = newTopology();
t.strings(data);
JsonObject config = new JsonObject();
config.addProperty(Configs.JOB_NAME, jobName);
Job job = awaitCompleteExecution(t, config);
assertEquals(jobName, job.getName());
}
@Test
public void jobDone0() throws Exception {
String[] data = new String[] {};
Topology t = newTopology();
@SuppressWarnings("unused")
TStream<String> s = t.strings(data);
Job job = awaitCompleteExecution(t);
assertEquals("job.getCurrentState() must be RUNNING", Job.State.RUNNING, job.getCurrentState());
assertEquals("job.getCurrentState() must be HEALTHY", Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
job.stateChange(Job.Action.CLOSE);
assertEquals("job.getCurrentState() must be CLOSED", Job.State.CLOSED, job.getCurrentState());
assertEquals("job.getCurrentState() must be HEALTHY", Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
}
@Test
public void jobDone1() throws Exception {
String[] data = new String[] {"a", "b", "c"};
Topology t = newTopology();
@SuppressWarnings("unused")
TStream<String> s = t.strings(data);
Job job = awaitCompleteExecution(t);
assertEquals("job.getCurrentState() must be RUNNING", Job.State.RUNNING, job.getCurrentState());
assertEquals("job.getCurrentState() must be HEALTHY", Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
job.stateChange(Job.Action.CLOSE);
assertEquals("job.getCurrentState() must be CLOSED", Job.State.CLOSED, job.getCurrentState());
assertEquals("job.getCurrentState() must be HEALTHY", Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
}
@Test
public void jobDone2() throws Exception {
final int NUM_TUPLES = 1000000;
Integer[] data = new Integer[NUM_TUPLES];
AtomicInteger numTuples = new AtomicInteger();
for (int i = 0; i < data.length; i++) {
data[i] = new Integer(i);
}
Topology t = newTopology();
TStream<Integer> ints = t.collection(Arrays.asList(data));
ints.sink(tuple -> numTuples.incrementAndGet());
Job job = awaitCompleteExecution(t);
Thread.sleep(1500); // wait for numTuples visibility
assertEquals(NUM_TUPLES, numTuples.get());
assertEquals("job.getCurrentState() must be RUNNING", Job.State.RUNNING, job.getCurrentState());
assertEquals("job.getCurrentState() must be HEALTHY", Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
job.stateChange(Job.Action.CLOSE);
assertEquals("job.getCurrentState() must be CLOSED", Job.State.CLOSED, job.getCurrentState());
assertEquals("job.getCurrentState() must be HEALTHY", Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
}
@Test
public void jobPeriodicSource() throws Exception {
Topology t = newTopology();
AtomicInteger n = new AtomicInteger(0);
@SuppressWarnings("unused")
TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 100, TimeUnit.MILLISECONDS);
Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
Job job = fj.get();
assertEquals(Job.State.RUNNING, job.getCurrentState());
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
assertTrue(n.get() > 0); // At least one tuple was processed
job.stateChange(Job.Action.CLOSE);
assertEquals(Job.State.CLOSED, job.getCurrentState());
}
@Test
public void jobPeriodicSourceCancellation() throws Exception {
Topology t = newTopology();
AtomicInteger n = new AtomicInteger(0);
@SuppressWarnings("unused")
TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 500, TimeUnit.MILLISECONDS);
// Get the source oplet
Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = t.graph().getVertices();
PeriodicSource<?> src = null;
for (Vertex<? extends Oplet<?, ?>, ?, ?> v : vertices) {
Oplet<?,?> op = v.getInstance();
assertTrue(op instanceof PeriodicSource);
src = (PeriodicSource<?>) op;
assertEquals(500, src.getPeriod());
assertSame(TimeUnit.MILLISECONDS, src.getUnit());
}
// Submit job
Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
Job job = fj.get();
assertEquals(Job.State.RUNNING, job.getCurrentState());
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
int tupleCount = n.get();
assertTrue("Expected more tuples than "+ tupleCount, tupleCount > 0); // At least one tuple was processed
assertEquals(Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
// Changing the period cancels the source's task and schedules new one
src.setPeriod(100);
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
// More tuples processed after resetting the period
assertTrue("Expected more tuples than "+ n.get(), n.get() > 3*tupleCount);
job.stateChange(Job.Action.CLOSE);
assertEquals(Job.State.CLOSED, job.getCurrentState());
assertEquals(Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
}
@Test
public void jobProcessSource() throws Exception {
Topology t = newTopology();
AtomicInteger n = new AtomicInteger(0);
@SuppressWarnings("unused")
TStream<Integer> ints = t.generate(() -> n.incrementAndGet());
Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
Job job = fj.get();
assertEquals(Job.State.RUNNING, job.getCurrentState());
assertEquals(Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
assertTrue(n.get() > 0); // At least one tuple was processed
job.stateChange(Job.Action.CLOSE);
assertEquals(Job.State.CLOSED, job.getCurrentState());
assertEquals(Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
}
@Test(expected = TimeoutException.class)
public void jobTimesOut() throws Exception {
Topology t = newTopology();
AtomicInteger n = new AtomicInteger(0);
@SuppressWarnings("unused")
TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 100, TimeUnit.MILLISECONDS);
Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
Job job = fj.get();
assertEquals(Job.State.RUNNING, job.getCurrentState());
try {
job.complete(700, TimeUnit.MILLISECONDS);
} finally {
assertTrue(n.get() > 0); // At least one tuple was processed
assertEquals(Job.State.RUNNING, job.getCurrentState());
assertEquals(Job.Health.HEALTHY, job.getHealth());
assertEquals("", job.getLastError());
}
}
@Test(expected = ExecutionException.class)
public void jobPeriodicSourceError() throws Exception {
Topology t = newTopology();
AtomicInteger n = new AtomicInteger(0);
TStream<Integer> ints = t.poll(() -> n.incrementAndGet(), 100, TimeUnit.MILLISECONDS);
ints.pipe(new FailedOplet<Integer>(5, 0));
Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
Job job = fj.get();
assertEquals(Job.State.RUNNING, job.getCurrentState());
try {
job.complete(10, TimeUnit.SECONDS);
} finally {
// RUNNING even though execution error
assertEquals(Job.State.RUNNING, job.getCurrentState());
assertEquals(Job.Health.UNHEALTHY, job.getHealth());
assertEquals("java.lang.RuntimeException: Expected Test Exception", job.getLastError());
}
}
@Test(expected = ExecutionException.class)
public void jobProcessSourceError() throws Exception {
Topology t = newTopology();
AtomicInteger n = new AtomicInteger(0);
TStream<Integer> ints = t.generate(() -> n.incrementAndGet());
ints.pipe(new FailedOplet<Integer>(12, 100));
Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t);
Job job = fj.get();
assertEquals(Job.State.RUNNING, job.getCurrentState());
try {
job.complete(10, TimeUnit.SECONDS);
} finally {
// RUNNING even though execution error
assertEquals(Job.State.RUNNING, job.getCurrentState());
assertEquals(Job.Health.UNHEALTHY, job.getHealth());
assertEquals("java.lang.RuntimeException: Expected Test Exception", job.getLastError());
}
}
private Job awaitCompleteExecution(Topology t) throws InterruptedException, ExecutionException {
return awaitCompleteExecution(t, null);
}
private Job awaitCompleteExecution(Topology t, JsonObject config) throws InterruptedException, ExecutionException {
Future<Job> fj = ((DirectProvider)getTopologyProvider()).submit(t, config);
Job job = fj.get();
job.complete();
return job;
}
/**
* Test oplet which fails after receiving a given number of tuples.
* @param <T>
*/
@SuppressWarnings("serial")
private static class FailedOplet<T> extends Pipe<T,T> {
private final int threshold;
private final int sleepMillis;
/**
* Create test oplet.
*
* @param afterTuples number of tuples to receive before failing
* @param sleepMillis milliseconds of sleep before processing each tuple
*/
public FailedOplet(int afterTuples, int sleepMillis) {
if (afterTuples < 0)
throw new IllegalArgumentException("afterTuples="+afterTuples);
if (sleepMillis < 0)
throw new IllegalArgumentException("sleepMillis="+sleepMillis);
this.threshold = afterTuples;
this.sleepMillis = sleepMillis;
}
@Override
public void close() throws Exception {
}
@Override
public void accept(T tuple) {
if (sleepMillis > 0) {
sleep(sleepMillis);
}
injectError(threshold);
submit(tuple);
}
private AtomicInteger count = new AtomicInteger(0);
protected void injectError(int errorAt) {
if (count.getAndIncrement() == errorAt)
throw new RuntimeException("Expected Test Exception");
}
protected static void sleep(long millis) {
try {
Thread.sleep(millis);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}