blob: 75afb74f1402b247b7d6c2977a28686299f81f82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.driver;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public class ResultQueueTest extends AbstractResultQueueTest {
private static final Map<String,Object> ATTRIBUTES = new HashMap<String,Object>() {{
put("this", "that");
}};
@Test
public void shouldGetSizeUntilError() throws Exception {
final Thread t = addToQueue(100, 10, true, false, 1);
try {
assertThat(resultQueue.size(), is(greaterThan(0)));
assertThat(readCompleted.isDone(), is(false));
final Exception theProblem = new Exception();
resultQueue.markError(theProblem);
assertThat(readCompleted.isDone(), is(true));
try {
resultQueue.size();
fail("Should have thrown an exception");
} catch (Exception ex) {
assertEquals(theProblem, ex.getCause());
}
} finally {
t.interrupt();
}
}
@Test
public void shouldBeEmptyThenNotEmpty() {
assertThat(resultQueue.isEmpty(), is(true));
resultQueue.add(new Result("test"));
assertThat(resultQueue.isEmpty(), is(false));
}
@Test
public void shouldNotBeEmptyUntilError() throws Exception {
final Thread t = addToQueue(100, 10, true, false, 1);
try {
assertThat(resultQueue.isEmpty(), is(false));
assertThat(readCompleted.isDone(), is(false));
final Exception theProblem = new Exception();
resultQueue.markError(theProblem);
assertThat(readCompleted.isDone(), is(true));
try {
resultQueue.isEmpty();
fail("Should have thrown an exception");
} catch (Exception ex) {
assertEquals(theProblem, ex.getCause());
}
} finally {
t.interrupt();
}
}
@Test
public void shouldDrainUntilError() throws Exception {
final Thread t = addToQueue(100, 10, true, false, 1);
try {
assertThat(resultQueue.isEmpty(), is(false));
final List<Result> drain = new ArrayList<>();
resultQueue.drainTo(drain);
assertThat(drain.size(), is(greaterThan(0)));
assertThat(readCompleted.isDone(), is(false));
// make sure some more items get added to the queue before assert
TimeUnit.MILLISECONDS.sleep(100);
assertThat(resultQueue.isEmpty(), is(false));
assertThat(readCompleted.isDone(), is(false));
final Exception theProblem = new Exception();
resultQueue.markError(theProblem);
assertThat(readCompleted.isDone(), is(true));
try {
resultQueue.drainTo(new ArrayList<>());
fail("Should have thrown an exception");
} catch (Exception ex) {
assertEquals(theProblem, ex.getCause());
}
} finally {
t.interrupt();
}
}
@Test
public void shouldAwaitEverythingAndFlushOnMarkCompleted() throws Exception {
final CompletableFuture<List<Result>> future = resultQueue.await(4);
resultQueue.add(new Result("test1"));
resultQueue.add(new Result("test2"));
resultQueue.add(new Result("test3"));
assertThat(future.isDone(), is(false));
resultQueue.markComplete(ATTRIBUTES);
assertThat(future.isDone(), is(true));
final List<Result> results = future.get();
assertEquals("test1", results.get(0).getString());
assertEquals("test2", results.get(1).getString());
assertEquals("test3", results.get(2).getString());
assertEquals(3, results.size());
assertThat(resultQueue.isEmpty(), is(true));
assertEquals("that", resultQueue.getStatusAttributes().get("this"));
}
@Test
public void shouldAwaitFailTheFutureOnMarkError() throws Exception {
final CompletableFuture<List<Result>> future = resultQueue.await(4);
resultQueue.add(new Result("test1"));
resultQueue.add(new Result("test2"));
resultQueue.add(new Result("test3"));
assertThat(future.isDone(), is(false));
resultQueue.markError(new Exception("no worky"));
assertThat(future.isDone(), is(true));
try {
future.get();
} catch (Exception ex) {
final Throwable t = ExceptionUtils.getRootCause(ex);
assertEquals("no worky", t.getMessage());
}
}
@Test
public void shouldAwaitToExpectedValueAndDrainOnAdd() throws Exception {
final CompletableFuture<List<Result>> future = resultQueue.await(3);
resultQueue.add(new Result("test1"));
resultQueue.add(new Result("test2"));
// shouldn't complete until the third item is in play
assertThat(future.isDone(), is(false));
resultQueue.add(new Result("test3"));
final List<Result> results = future.get();
assertEquals("test1", results.get(0).getString());
assertEquals("test2", results.get(1).getString());
assertEquals("test3", results.get(2).getString());
assertEquals(3, results.size());
assertThat(resultQueue.isEmpty(), is(true));
}
@Test
public void shouldAwaitMultipleToExpectedValueAndDrainOnAdd() throws Exception {
final CompletableFuture<List<Result>> future1 = resultQueue.await(3);
final CompletableFuture<List<Result>> future2 = resultQueue.await(1);
resultQueue.add(new Result("test1"));
resultQueue.add(new Result("test2"));
// shouldn't complete the first future until the third item is in play
assertThat(future1.isDone(), is(false));
assertThat(future2.isDone(), is(false));
resultQueue.add(new Result("test3"));
final List<Result> results1 = future1.get();
assertEquals("test1", results1.get(0).getString());
assertEquals("test2", results1.get(1).getString());
assertEquals("test3", results1.get(2).getString());
assertEquals(3, results1.size());
assertThat(future1.isDone(), is(true));
assertThat(future2.isDone(), is(false));
resultQueue.add(new Result("test4"));
assertThat(future1.isDone(), is(true));
assertThat(future2.isDone(), is(true));
final List<Result> results2 = future2.get();
assertEquals("test4", results2.get(0).getString());
assertEquals(1, results2.size());
assertThat(resultQueue.isEmpty(), is(true));
}
@Test
public void shouldAwaitToExpectedValueAndDrainOnAwait() throws Exception {
resultQueue.add(new Result("test1"));
resultQueue.add(new Result("test2"));
resultQueue.add(new Result("test3"));
final CompletableFuture<List<Result>> future = resultQueue.await(3);
assertThat(future.isDone(), is(true));
final List<Result> results = future.get();
assertEquals("test1", results.get(0).getString());
assertEquals("test2", results.get(1).getString());
assertEquals("test3", results.get(2).getString());
assertEquals(3, results.size());
assertThat(resultQueue.isEmpty(), is(true));
}
@Test
public void shouldAwaitToReadCompletedAndDrainOnAwait() throws Exception {
resultQueue.add(new Result("test1"));
resultQueue.add(new Result("test2"));
resultQueue.add(new Result("test3"));
resultQueue.markComplete(ATTRIBUTES);
// you might want 30 but there are only three
final CompletableFuture<List<Result>> future = resultQueue.await(30);
assertThat(future.isDone(), is(true));
final List<Result> results = future.get();
assertEquals("test1", results.get(0).getString());
assertEquals("test2", results.get(1).getString());
assertEquals("test3", results.get(2).getString());
assertEquals(3, results.size());
assertThat(resultQueue.isEmpty(), is(true));
assertEquals("that", resultQueue.getStatusAttributes().get("this"));
}
@Test
public void shouldDrainAsItemsArrive() throws Exception {
final Thread t = addToQueue(1000, 1, true);
try {
final AtomicInteger count1 = new AtomicInteger(0);
final AtomicInteger count2 = new AtomicInteger(0);
final AtomicInteger count3 = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(3);
resultQueue.await(500).thenAcceptAsync(r -> {
count1.set(r.size());
latch.countDown();
});
resultQueue.await(150).thenAcceptAsync(r -> {
count2.set(r.size());
latch.countDown();
});
resultQueue.await(350).thenAcceptAsync(r -> {
count3.set(r.size());
latch.countDown();
});
assertThat(latch.await(10000, TimeUnit.MILLISECONDS), is(true));
assertEquals(500, count1.get());
assertEquals(150, count2.get());
assertEquals(350, count3.get());
assertThat(resultQueue.isEmpty(), is(true));
} finally {
t.interrupt();
}
}
@Test
public void shouldHandleBulkSetSideEffects() throws Exception {
final CompletableFuture<List<Result>> o = resultQueue.await(1);
assertThat(o.isDone(), is(false));
resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_BULKSET, new DefaultRemoteTraverser<>("brian", 2));
assertThat(o.isDone(), is(false));
resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_BULKSET, new DefaultRemoteTraverser<>("brian", 2));
assertThat(o.isDone(), is(false));
resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_BULKSET, new DefaultRemoteTraverser<>("belinda", 6));
assertThat(o.isDone(), is(false));
resultQueue.markComplete(ATTRIBUTES);
assertThat(o.isDone(), is(true));
final BulkSet<String> bulkSet = o.get().get(0).get(BulkSet.class);
assertEquals(4, bulkSet.get("brian"));
assertEquals(6, bulkSet.get("belinda"));
assertEquals("that", resultQueue.getStatusAttributes().get("this"));
}
@Test
public void shouldHandleListSideEffects() throws Exception {
final CompletableFuture<List<Result>> o = resultQueue.await(1);
assertThat(o.isDone(), is(false));
resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_LIST, "stephen");
assertThat(o.isDone(), is(false));
resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_LIST, "daniel");
assertThat(o.isDone(), is(false));
resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_LIST, "dave");
assertThat(o.isDone(), is(false));
resultQueue.markComplete(ATTRIBUTES);
assertThat(o.isDone(), is(true));
final List<String> list = o.get().get(0).get(ArrayList.class);
assertEquals("stephen", list.get(0));
assertEquals("daniel", list.get(1));
assertEquals("dave", list.get(2));
assertEquals("that", resultQueue.getStatusAttributes().get("this"));
}
@Test
public void shouldHandleSetSideEffects() throws Exception {
final CompletableFuture<List<Result>> o = resultQueue.await(1);
assertThat(o.isDone(), is(false));
resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_SET, "stephen");
assertThat(o.isDone(), is(false));
resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_SET, "daniel");
assertThat(o.isDone(), is(false));
resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_SET, "dave");
assertThat(o.isDone(), is(false));
resultQueue.markComplete(ATTRIBUTES);
assertThat(o.isDone(), is(true));
final Set<String> set = o.get().get(0).get(HashSet.class);
assertThat(set.contains("stephen"), is(true));
assertThat(set.contains("daniel"), is(true));
assertThat(set.contains("dave"), is(true));
assertEquals("that", resultQueue.getStatusAttributes().get("this"));
}
@Test
public void shouldHandleMapSideEffects() throws Exception {
final CompletableFuture<List<Result>> o = resultQueue.await(1);
assertThat(o.isDone(), is(false));
final Map<String,String> m = new HashMap<>();
m.put("s", "stephen");
m.put("m", "marko");
m.put("d", "daniel");
m.entrySet().forEach(e -> {
resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_MAP, e);
assertThat(o.isDone(), is(false));
});
resultQueue.markComplete(ATTRIBUTES);
assertThat(o.isDone(), is(true));
final Map<String, String> list = o.get().get(0).get(HashMap.class);
assertEquals("stephen", list.get("s"));
assertEquals("daniel", list.get("d"));
assertEquals("marko", list.get("m"));
assertEquals("that", resultQueue.getStatusAttributes().get("this"));
}
@Test
public void shouldHandleNotAggregateSideEffects() throws Exception {
final CompletableFuture<List<Result>> o = resultQueue.await(1);
assertThat(o.isDone(), is(false));
final Map<String,String> m = new HashMap<>();
m.put("s", "stephen");
m.put("m", "marko");
m.put("d", "daniel");
resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_NONE, m);
resultQueue.markComplete(ATTRIBUTES);
assertThat(o.isDone(), is(true));
final Map<String, String> list = o.get().get(0).get(HashMap.class);
assertEquals("stephen", list.get("s"));
assertEquals("daniel", list.get("d"));
assertEquals("marko", list.get("m"));
assertEquals("that", resultQueue.getStatusAttributes().get("this"));
}
}