blob: bb714ee8c0cc32b2ea1b4867b0f4ede56fe5be6b [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.aurora.common.testing.TearDownTestCase;
import org.apache.aurora.gen.AppcImage;
import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.Container;
import org.apache.aurora.gen.DockerImage;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.Image;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.MesosContainer;
import org.apache.aurora.gen.MesosFetcherURI;
import org.apache.aurora.gen.Metadata;
import org.apache.aurora.gen.Mode;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskQuery;
import org.apache.aurora.gen.Volume;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.testing.StorageEntityUtil;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public abstract class AbstractTaskStoreTest extends TearDownTestCase {
protected static final IHostAttributes HOST_A = IHostAttributes.build(
new HostAttributes(
"hostA",
ImmutableSet.of(new Attribute("zone", ImmutableSet.of("1a"))))
.setSlaveId("slaveIdA")
.setMode(MaintenanceMode.NONE));
protected static final IHostAttributes HOST_B = IHostAttributes.build(
new HostAttributes(
"hostB",
ImmutableSet.of(new Attribute("zone", ImmutableSet.of("1a"))))
.setSlaveId("slaveIdB")
.setMode(MaintenanceMode.NONE));
protected static final IScheduledTask TASK_A = createTask("a");
protected static final IScheduledTask TASK_B =
setContainer(createTask("b"), Container.mesos(new MesosContainer()));
protected static final IScheduledTask TASK_C = createTask("c");
protected static final IScheduledTask TASK_D = createTask("d");
protected Injector injector;
protected Storage storage;
protected abstract Module getStorageModule();
@Before
public void setUp() {
injector = Guice.createInjector(getStorageModule());
storage = injector.getInstance(Storage.class);
storage.prepare();
storage.write((NoResult.Quiet) storeProvider -> {
AttributeStore.Mutable attributeStore = storeProvider.getAttributeStore();
attributeStore.saveHostAttributes(HOST_A);
attributeStore.saveHostAttributes(HOST_B);
});
}
private Optional<IScheduledTask> fetchTask(String taskId) {
return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTask(taskId));
}
private Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTasks(query));
}
protected void saveTasks(IScheduledTask... tasks) {
saveTasks(ImmutableSet.copyOf(tasks));
}
private void saveTasks(Set<IScheduledTask> tasks) {
storage.write((NoResult.Quiet)
storeProvider -> storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.copyOf(tasks)));
}
private Optional<IScheduledTask> mutateTask(String taskId, TaskMutation mutation) {
return storage.write(
storeProvider -> storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
}
protected void deleteTasks(String... taskIds) {
storage.write((NoResult.Quiet) storeProvider ->
storeProvider.getUnsafeTaskStore().deleteTasks(ImmutableSet.copyOf(taskIds)));
}
protected void deleteAllTasks() {
storage.write(
(NoResult.Quiet) storeProvider -> storeProvider.getUnsafeTaskStore().deleteAllTasks());
}
@Test
public void testSave() {
IScheduledTask aWithHost = setHost(TASK_A, HOST_A);
StorageEntityUtil.assertFullyPopulated(aWithHost.newBuilder());
saveTasks(aWithHost, TASK_B);
assertStoreContents(aWithHost, TASK_B);
saveTasks(TASK_C, TASK_D);
assertStoreContents(aWithHost, TASK_B, TASK_C, TASK_D);
// Saving the same task should overwrite.
IScheduledTask taskAModified = IScheduledTask.build(aWithHost.newBuilder().setStatus(RUNNING));
saveTasks(taskAModified);
assertStoreContents(taskAModified, TASK_B, TASK_C, TASK_D);
}
@Test
public void testSaveWithMetadata() {
ScheduledTask builder = TASK_A.newBuilder();
builder.getAssignedTask().getTask().setMetadata(
ImmutableSet.of(
new Metadata("package", "a"),
new Metadata("package", "b")));
IScheduledTask task = IScheduledTask.build(builder);
saveTasks(task);
assertStoreContents(task);
}
@Test
public void testSaveWithMesosFetcherUris() {
ScheduledTask builder = TASK_A.newBuilder();
builder.getAssignedTask().getTask().setMesosFetcherUris(
ImmutableSet.of(
new MesosFetcherURI("pathA").setExtract(true).setCache(true),
new MesosFetcherURI("pathB").setExtract(true).setCache(true)));
IScheduledTask task = IScheduledTask.build(builder);
saveTasks(task);
assertStoreContents(task);
}
@Test
public void testSaveWithContainerVolumes() {
ScheduledTask builder = TASK_B.newBuilder();
Image image = Image.docker(new DockerImage().setName("some-name").setTag("some-tag"));
List<Volume> volumes = ImmutableList.of(
new Volume("container2", "host2", Mode.RW),
new Volume("container", "host", Mode.RO));
builder.getAssignedTask().getTask().getContainer().getMesos().setImage(image)
.setVolumes(volumes);
IScheduledTask task = IScheduledTask.build(builder);
saveTasks(task);
assertStoreContents(task);
}
@Test
public void testSaveWithDockerImage() {
ScheduledTask builder = TASK_A.newBuilder();
Image image = new Image();
image.setDocker(new DockerImage().setName("some-name").setTag("some-tag"));
builder.getAssignedTask().getTask().setContainer(
Container.mesos(new MesosContainer().setImage(image)));
IScheduledTask task = IScheduledTask.build(builder);
saveTasks(task);
assertStoreContents(task);
}
@Test
public void testSaveWithAppcImage() {
ScheduledTask builder = TASK_A.newBuilder();
Image image = new Image();
image.setAppc(new AppcImage().setName("some-name").setImageId("some-tag"));
builder.getAssignedTask().getTask().setContainer(
Container.mesos(new MesosContainer().setImage(image)));
IScheduledTask task = IScheduledTask.build(builder);
saveTasks(task);
assertStoreContents(task);
}
@Test
public void testSaveWithMultiplePorts() {
ScheduledTask builder = TASK_A.newBuilder();
builder.getAssignedTask().setAssignedPorts(ImmutableMap.of("http", 1000, "tcp", 2000));
IScheduledTask task = IScheduledTask.build(builder);
saveTasks(task);
Optional<IScheduledTask> maybeTask = fetchTask(task.getAssignedTask().getTaskId());
saveTasks(maybeTask.get());
assertStoreContents(task);
}
@Test
public void testQuery() {
assertStoreContents();
saveTasks(TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(Query.taskScoped("b"), TASK_B);
assertQueryResults(Query.taskScoped("a", "d"), TASK_A, TASK_D);
assertQueryResults(Query.roleScoped("role-c"), TASK_C);
assertQueryResults(Query.envScoped("role-c", "env-c"), TASK_C);
assertQueryResults(Query.envScoped("role-c", "devel"));
assertQueryResults(
Query.unscoped().byStatus(ASSIGNED),
TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(
Query.instanceScoped(JobKeys.from("role-a", "env-a", "job-a"), 2).active(), TASK_A);
assertQueryResults(Query.jobScoped(JobKeys.from("role-b", "env-b", "job-b")).active(), TASK_B);
assertQueryResults(Query.jobScoped(JobKeys.from("role-b", "devel", "job-b")).active());
// Explicitly call out the matching behaviors for different means of expressing empty query
// conditions. Specifically - neither null collections nor empty collections should trigger
// query restrictions and thus they should yield the same results.
assertQueryResults(new TaskQuery().setTaskIds(null), TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(new TaskQuery().setTaskIds(ImmutableSet.of()),
TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(new TaskQuery().setInstanceIds(null), TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(
new TaskQuery().setInstanceIds(ImmutableSet.of()),
TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(new TaskQuery().setStatuses(null), TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(
new TaskQuery().setStatuses(ImmutableSet.of()),
TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(new TaskQuery().setSlaveHosts(null), TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(new TaskQuery().setSlaveHosts(ImmutableSet.of()),
TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(new TaskQuery().setJobKeys(null), TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(new TaskQuery().setJobKeys(ImmutableSet.of()),
TASK_A, TASK_B, TASK_C, TASK_D);
}
@Test
public void testQueryMultipleInstances() {
ImmutableSet.Builder<IScheduledTask> tasksBuilder = ImmutableSet.builder();
for (int i = 0; i < 100; i++) {
ScheduledTask builder = TASK_A.newBuilder();
builder.getAssignedTask()
.setTaskId("id" + i)
.setInstanceId(i);
tasksBuilder.add(IScheduledTask.build(builder));
}
Set<IScheduledTask> tasks = tasksBuilder.build();
saveTasks(tasks);
assertQueryResults(Query.unscoped(), tasks);
}
@Test
public void testQueryBySlaveHost() {
IScheduledTask a = setHost(makeTask("a", JobKeys.from("role", "env", "job")), HOST_A);
IScheduledTask b = setHost(makeTask("b", JobKeys.from("role", "env", "job")), HOST_B);
saveTasks(a, b);
assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a);
assertQueryResults(Query.slaveScoped(HOST_A.getHost(), HOST_B.getHost()), a, b);
}
@Test
public void testQueryByJobKeys() {
assertStoreContents();
saveTasks(TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(
Query.jobScoped(ImmutableSet.of(
JobKeys.from("role-a", "env-a", "job-a"),
JobKeys.from("role-b", "env-b", "job-b"),
JobKeys.from("role-c", "env-c", "job-c"))),
TASK_A, TASK_B, TASK_C);
// Conflicting jobs will produce the result from the last added JobKey
assertQueryResults(
Query.jobScoped(JobKeys.from("role-a", "env-a", "job-a"))
.byJobKeys(ImmutableSet.of(JobKeys.from("role-b", "env-b", "job-b"))),
TASK_B);
// The .byJobKeys will override the previous scoping and OR all of the keys.
assertQueryResults(
Query.jobScoped(JobKeys.from("role-a", "env-a", "job-a"))
.byJobKeys(ImmutableSet.of(
JobKeys.from("role-b", "env-b", "job-b"),
JobKeys.from("role-a", "env-a", "job-a"))),
TASK_A, TASK_B);
// Combination of individual field and jobKeys is allowed.
assertQueryResults(
Query.roleScoped("role-b")
.byJobKeys(ImmutableSet.of(
JobKeys.from("role-b", "env-b", "job-b"),
JobKeys.from("role-a", "env-a", "job-a"))),
TASK_B);
}
@Test
public void testMutate() {
saveTasks(TASK_A, TASK_B, TASK_C, TASK_D);
assertQueryResults(Query.statusScoped(RUNNING));
mutateTask("a", task -> IScheduledTask.build(task.newBuilder().setStatus(RUNNING)));
assertQueryResults(
Query.statusScoped(RUNNING),
IScheduledTask.build(TASK_A.newBuilder().setStatus(RUNNING)));
assertEquals(
Optional.empty(),
mutateTask(
"nonexistent",
task -> IScheduledTask.build(task.newBuilder().setStatus(RUNNING))));
assertStoreContents(
IScheduledTask.build(TASK_A.newBuilder().setStatus(RUNNING)),
IScheduledTask.build(TASK_B.newBuilder().setStatus(ASSIGNED)),
IScheduledTask.build(TASK_C.newBuilder().setStatus(ASSIGNED)),
IScheduledTask.build(TASK_D.newBuilder().setStatus(ASSIGNED)));
}
@Test
public void testDelete() {
saveTasks(TASK_A, TASK_B, TASK_C, TASK_D);
deleteTasks("a");
assertStoreContents(TASK_B, TASK_C, TASK_D);
deleteTasks("c");
assertStoreContents(TASK_B, TASK_D);
deleteTasks("b", "d");
assertStoreContents();
}
@Test
public void testDeleteAll() {
saveTasks(TASK_A, TASK_B, TASK_C, TASK_D);
deleteAllTasks();
assertStoreContents();
}
@Test
public void testConsistentJobIndex() {
IScheduledTask a = makeTask("a", JobKeys.from("jim", "test", "job"));
IScheduledTask b = makeTask("b", JobKeys.from("jim", "test", "job"));
IScheduledTask c = makeTask("c", JobKeys.from("jim", "test", "job2"));
IScheduledTask d = makeTask("d", JobKeys.from("joe", "test", "job"));
IScheduledTask e = makeTask("e", JobKeys.from("jim", "prod", "job"));
Query.Builder jimsJob = Query.jobScoped(JobKeys.from("jim", "test", "job"));
Query.Builder jimsJob2 = Query.jobScoped(JobKeys.from("jim", "test", "job2"));
Query.Builder joesJob = Query.jobScoped(JobKeys.from("joe", "test", "job"));
saveTasks(a, b, c, d, e);
assertQueryResults(jimsJob, a, b);
assertQueryResults(jimsJob2, c);
assertQueryResults(joesJob, d);
deleteTasks(Tasks.id(b));
assertQueryResults(jimsJob, a);
assertQueryResults(jimsJob2, c);
assertQueryResults(joesJob, d);
mutateTask(Tasks.id(a), task -> IScheduledTask.build(task.newBuilder().setStatus(RUNNING)));
IScheduledTask aRunning = IScheduledTask.build(a.newBuilder().setStatus(RUNNING));
assertQueryResults(jimsJob, aRunning);
assertQueryResults(jimsJob2, c);
assertQueryResults(joesJob, d);
deleteTasks(Tasks.id(d));
assertQueryResults(joesJob);
deleteTasks(Tasks.id(d));
assertQueryResults(jimsJob, aRunning);
assertQueryResults(jimsJob2, c);
assertQueryResults(joesJob);
saveTasks(b);
assertQueryResults(jimsJob, aRunning, b);
assertQueryResults(jimsJob2, c);
assertQueryResults(joesJob);
}
@Test
public void testCanonicalTaskConfigs() {
IScheduledTask a = createTask("a");
IScheduledTask b = createTask("a");
IScheduledTask c = createTask("a");
saveTasks(a, b, c);
Set<IScheduledTask> inserted = ImmutableSet.of(a, b, c);
Set<ITaskConfig> storedConfigs = FluentIterable.from(fetchTasks(Query.unscoped()))
.transform(Tasks::getConfig)
.toSet();
assertEquals(
FluentIterable.from(inserted).transform(Tasks::getConfig).toSet(),
storedConfigs);
Map<ITaskConfig, ITaskConfig> identityMap = Maps.newIdentityHashMap();
for (ITaskConfig stored : storedConfigs) {
identityMap.put(stored, stored);
}
assertEquals(
ImmutableMap.of(Tasks.getConfig(a), Tasks.getConfig(a)),
identityMap);
}
private static IScheduledTask setHost(IScheduledTask task, IHostAttributes host) {
ScheduledTask builder = task.newBuilder();
builder.getAssignedTask()
.setSlaveHost(host.getHost())
.setSlaveId(host.getSlaveId());
return IScheduledTask.build(builder);
}
private static IScheduledTask unsetHost(IScheduledTask task) {
ScheduledTask builder = task.newBuilder();
builder.getAssignedTask()
.setSlaveHost(null)
.setSlaveId(null);
return IScheduledTask.build(builder);
}
private static IScheduledTask setConfigData(IScheduledTask task, String configData) {
ScheduledTask builder = task.newBuilder();
builder.getAssignedTask().getTask().getExecutorConfig().setData(configData);
return IScheduledTask.build(builder);
}
@Test
public void testAddSlaveHost() {
final IScheduledTask a = createTask("a");
saveTasks(a);
assertQueryResults(Query.slaveScoped(HOST_A.getHost()));
final IScheduledTask b = setHost(a, HOST_A);
Optional<IScheduledTask> result = mutateTask(Tasks.id(a),
task -> {
assertEquals(a, task);
return b;
});
assertEquals(Optional.of(b), result);
assertQueryResults(Query.slaveScoped(HOST_A.getHost()), b);
// Unrealistic behavior, but proving that the secondary index can handle key mutations.
final IScheduledTask c = setHost(b, HOST_B);
Optional<IScheduledTask> result2 = mutateTask(Tasks.id(a),
task -> {
assertEquals(b, task);
return c;
});
assertEquals(Optional.of(c), result2);
assertQueryResults(Query.slaveScoped(HOST_B.getHost()), c);
deleteTasks(Tasks.id(a));
assertQueryResults(Query.slaveScoped(HOST_B.getHost()));
}
@Test
public void testUnsetSlaveHost() {
// Unrealistic behavior, but proving that the secondary index does not become stale.
final IScheduledTask a = setHost(createTask("a"), HOST_A);
saveTasks(a);
assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a);
final IScheduledTask b = unsetHost(a);
Optional<IScheduledTask> result = mutateTask(Tasks.id(a),
task -> {
assertEquals(a, task);
return b;
});
assertEquals(Optional.of(b), result);
assertQueryResults(Query.slaveScoped(HOST_A.getHost()));
assertQueryResults(Query.taskScoped(Tasks.id(b)), b);
}
@Test
public void testTasksOnSameHost() {
final IScheduledTask a = setHost(createTask("a"), HOST_A);
final IScheduledTask b = setHost(createTask("b"), HOST_A);
saveTasks(a, b);
assertQueryResults(Query.slaveScoped(HOST_A.getHost()), a, b);
}
@Test
public void testSaveOverwrites() {
// Ensures that saving a task with an existing task ID is effectively the same as a mutate,
// and does not result in a duplicate object in the primary or secondary index.
final IScheduledTask a = setHost(createTask("a"), HOST_A);
saveTasks(a);
final IScheduledTask updated = setConfigData(a, "new config data");
saveTasks(updated);
assertQueryResults(Query.taskScoped(Tasks.id(a)), updated);
assertQueryResults(Query.slaveScoped(HOST_A.getHost()), updated);
}
private Set<IJobKey> getJobKeys() {
return storage.read(storeProvider -> storeProvider.getTaskStore().getJobKeys());
}
private Set<IJobKey> toJobKeys(IScheduledTask... tasks) {
return FluentIterable.from(ImmutableSet.copyOf(tasks))
.transform(Tasks::getJob)
.toSet();
}
@Test
public void testGetsJobKeys() {
assertEquals(ImmutableSet.of(), getJobKeys());
saveTasks(TASK_A);
assertEquals(toJobKeys(TASK_A), getJobKeys());
saveTasks(TASK_B, TASK_C);
assertEquals(toJobKeys(TASK_A, TASK_B, TASK_C), getJobKeys());
deleteTasks(Tasks.id(TASK_B));
assertEquals(toJobKeys(TASK_A, TASK_C), getJobKeys());
IJobKey multiInstanceJob = JobKeys.from("role", "env", "instances");
saveTasks(
makeTask("instance1", multiInstanceJob),
makeTask("instance2", multiInstanceJob),
makeTask("instance3", multiInstanceJob));
assertEquals(
ImmutableSet.builder().addAll(toJobKeys(TASK_A, TASK_C)).add(multiInstanceJob).build(),
getJobKeys());
}
@Ignore
@Test
public void testReadSecondaryIndexMultipleThreads() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(4,
new ThreadFactoryBuilder().setNameFormat("SlowRead-%d").setDaemon(true).build());
try {
ImmutableSet.Builder<IScheduledTask> builder = ImmutableSet.builder();
final int numTasks = 100;
final int numJobs = 100;
for (int j = 0; j < numJobs; j++) {
for (int t = 0; t < numTasks; t++) {
builder.add(makeTask("" + j + "-" + t, JobKeys.from("role", "env", "name" + j)));
}
}
saveTasks(builder.build());
final CountDownLatch read = new CountDownLatch(numJobs);
for (int j = 0; j < numJobs; j++) {
final int id = j;
executor.submit(() -> {
assertNotNull(fetchTasks(Query.jobScoped(JobKeys.from("role", "env", "name" + id))));
read.countDown();
});
executor.submit(() -> saveTasks(createTask("TaskNew1" + id)));
}
read.await();
} finally {
MoreExecutors.shutdownAndAwaitTermination(executor, 1, TimeUnit.SECONDS);
}
}
@Test
public void testNullVsEmptyRelations() throws Exception {
// Test for regression of AURORA-1476.
ITaskConfig nullMetadata =
ITaskConfig.build(TaskTestUtil.makeConfig(TaskTestUtil.JOB).newBuilder().setMetadata(null));
IScheduledTask a = makeTask("a", nullMetadata);
IScheduledTask b = makeTask("a", nullMetadata);
IScheduledTask c = makeTask("a", nullMetadata);
saveTasks(a);
saveTasks(b);
saveTasks(c);
}
@Test
public void testConcurrentFetchTasks() throws Exception {
// Test for regression of AURORA-1625
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
assertStoreContents();
saveTasks(TASK_A, TASK_B, TASK_C, TASK_D);
List<ListenableFuture<Integer>> futures = Lists.newArrayList();
for (int i = 0; i < 100; i++) {
futures.add(executor.submit(() -> Iterables.size(fetchTasks(Query.unscoped()))));
}
Future<List<Integer>> f = Futures.allAsList(futures);
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
assertEquals(Iterables.getOnlyElement(ImmutableSet.copyOf(f.get())), (Integer) 4);
}
private void assertStoreContents(IScheduledTask... tasks) {
assertQueryResults(Query.unscoped(), tasks);
}
private void assertQueryResults(TaskQuery query, IScheduledTask... tasks) {
assertQueryResults(Query.arbitrary(query), tasks);
}
private void assertQueryResults(Query.Builder query, IScheduledTask... tasks) {
assertQueryResults(query, ImmutableSet.copyOf(tasks));
}
private void assertQueryResults(Query.Builder query, Set<IScheduledTask> tasks) {
Iterable<IScheduledTask> result = fetchTasks(query);
assertQueryHasNoDupes(result);
Set<IScheduledTask> set = ImmutableSet.copyOf(result);
assertEquals(tasks, set);
}
private void assertQueryHasNoDupes(Iterable<IScheduledTask> result) {
assertEquals(ImmutableList.copyOf(result).size(), ImmutableSet.copyOf(result).size());
}
private static IScheduledTask createTask(String id) {
return makeTask(id, JobKeys.from("role-" + id, "env-" + id, "job-" + id));
}
private static IScheduledTask setContainer(IScheduledTask task, Container container) {
ScheduledTask builder = task.newBuilder();
builder.getAssignedTask().getTask().setContainer(container);
return IScheduledTask.build(builder);
}
}