blob: 534a912e099e844e67c96674dd1254a8106eb5a4 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.app;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.hash.Hashing;
import com.google.common.net.InetAddresses;
import com.google.common.util.concurrent.Atomics;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import org.apache.aurora.GuavaUtils;
import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
import org.apache.aurora.common.application.Lifecycle;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Data;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.Stats;
import org.apache.aurora.common.zookeeper.Credentials;
import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.ServerInfo;
import org.apache.aurora.gen.storage.LogEntry;
import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.SaveFrameworkId;
import org.apache.aurora.gen.storage.SaveTasks;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.Transaction;
import org.apache.aurora.gen.storage.storageConstants;
import org.apache.aurora.scheduler.AppStartup;
import org.apache.aurora.scheduler.TierModule;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.config.CliOptions;
import org.apache.aurora.scheduler.config.types.TimeAmount;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule;
import org.apache.aurora.scheduler.discovery.ZooKeeperConfig;
import org.apache.aurora.scheduler.log.Log;
import org.apache.aurora.scheduler.log.Log.Entry;
import org.apache.aurora.scheduler.log.Log.Position;
import org.apache.aurora.scheduler.log.Log.Stream;
import org.apache.aurora.scheduler.maintenance.MaintenanceController;
import org.apache.aurora.scheduler.mesos.DriverFactory;
import org.apache.aurora.scheduler.mesos.DriverSettings;
import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory;
import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
import org.apache.aurora.scheduler.storage.backup.BackupModule;
import org.apache.aurora.scheduler.storage.durability.DurableStorageModule;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.IServerInfo;
import org.apache.aurora.scheduler.storage.log.EntrySerializer;
import org.apache.aurora.scheduler.storage.log.LogPersistenceModule;
import org.apache.aurora.scheduler.storage.log.SnapshotModule;
import org.apache.aurora.scheduler.storage.log.SnapshotterImpl;
import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher;
import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher;
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
import org.apache.mesos.v1.Protos.FrameworkInfo;
import org.apache.mesos.v1.Protos.Resource;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createControl;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class SchedulerIT extends BaseZooKeeperTest {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerIT.class);
private static final String CLUSTER_NAME = "integration_test_cluster";
private static final String SERVERSET_PATH = "/fake/service/path";
private static final String STATS_URL_PREFIX = "fake_url";
private static final String FRAMEWORK_ID = "integration_test_framework_id";
private static final Protos.MasterInfo MASTER = Protos.MasterInfo.newBuilder()
.setId("master-id")
.setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD
.setPort(5050).build();
private static final IHostAttributes HOST_ATTRIBUTES = IHostAttributes.build(new HostAttributes()
.setHost("host")
.setSlaveId("slave-id")
.setMode(MaintenanceMode.NONE)
.setAttributes(ImmutableSet.of()));
private static final FrameworkInfo BASE_INFO = FrameworkInfo.newBuilder()
.setUser("framework user")
.setName("test framework")
.build();
private static final DriverSettings SETTINGS = new DriverSettings(
"fakemaster",
Optional.empty());
private final ExecutorService executor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("SchedulerIT-%d").setDaemon(true).build());
private final AtomicReference<Optional<RuntimeException>> mainException =
Atomics.newReference(Optional.empty());
private IMocksControl control;
private SchedulerDriver driver;
private DriverFactory driverFactory;
private Log log;
private Stream logStream;
private StreamMatcher streamMatcher;
private EntrySerializer entrySerializer;
private File backupDir;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void mySetUp() throws Exception {
control = createControl();
addTearDown(() -> {
if (mainException.get().isPresent()) {
RuntimeException e = mainException.get().get();
LOG.error("Scheduler main exited with an exception", e);
fail(e.getMessage());
}
control.verify();
});
backupDir = temporaryFolder.newFolder();
driver = control.createMock(SchedulerDriver.class);
// This is necessary to allow driver to block, otherwise it would stall other mocks.
EasyMock.makeThreadSafe(driver, false);
driverFactory = control.createMock(DriverFactory.class);
log = control.createMock(Log.class);
logStream = control.createMock(Stream.class);
streamMatcher = LogOpMatcher.matcherFor(logStream);
entrySerializer = new EntrySerializer.EntrySerializerImpl(
Amount.of(512, Data.KB),
Hashing.md5());
}
private Injector startScheduler() throws Exception {
// TODO(wfarner): Try to accomplish all this by subclassing SchedulerMain and actually using
// AppLauncher.
Module testModule = new AbstractModule() {
@Override
protected void configure() {
bind(DriverFactory.class).toInstance(driverFactory);
bind(FrameworkInfoFactory.class).toInstance(() -> BASE_INFO);
bind(DriverSettings.class).toInstance(SETTINGS);
bind(Log.class).toInstance(log);
Set<Resource> overhead = ImmutableSet.of(
mesosScalar(CPUS, 0.1),
mesosScalar(RAM_MB, 1));
bind(ExecutorSettings.class)
.toInstance(TestExecutorSettings.thermosOnlyWithOverhead(overhead));
BackupModule.Options backupOptions = new BackupModule.Options();
backupOptions.backupDir = backupDir;
install(new BackupModule(backupOptions, SnapshotterImpl.class));
bind(IServerInfo.class).toInstance(
IServerInfo.build(
new ServerInfo()
.setClusterName(CLUSTER_NAME)
.setStatsUrlPrefix(STATS_URL_PREFIX)));
bind(new TypeLiteral<Amount<Long, Time>>() { })
.annotatedWith(
MaintenanceController.MaintenanceControllerImpl.PollingInterval.class)
.toInstance(new TimeAmount(1, Time.MINUTES));
}
};
ZooKeeperConfig zkClientConfig =
ZooKeeperConfig.create(
ImmutableList.of(
InetSocketAddress.createUnresolved("localhost", getServer().getPort())))
.withCredentials(Credentials.digestCredentials("mesos", "mesos"));
SchedulerMain main = SchedulerMain.class.newInstance();
Injector injector = Guice.createInjector(
ImmutableList.<Module>builder()
.add(SchedulerMain.getUniversalModule(new CliOptions()))
.add(new TierModule(TaskTestUtil.TIER_CONFIG))
.add(new DurableStorageModule())
.add(new LogPersistenceModule(new LogPersistenceModule.Options()))
.add(new SnapshotModule(new SnapshotModule.Options()))
.add(new ServiceDiscoveryModule(zkClientConfig, SERVERSET_PATH))
.add(testModule)
.build()
);
injector.injectMembers(main);
Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
executor.submit(() -> {
try {
main.run(new SchedulerMain.Options());
} catch (RuntimeException e) {
mainException.set(Optional.of(e));
executor.shutdownNow();
}
});
addTearDown(() -> {
lifecycle.shutdown();
MoreExecutors.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
});
injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, AppStartup.class))
.awaitHealthy();
return injector;
}
private void awaitSchedulerReady(Injector injector) throws Exception {
executor.submit(() -> {
ServiceGroupMonitor groupMonitor = injector.getInstance(ServiceGroupMonitor.class);
try {
// A timeout is used because certain types of assertion errors (mocks) will not surface
// until the main test thread exits this body of code.
long waited = 0;
while (waited < 5000) {
if (groupMonitor.get().isEmpty()) {
try {
Thread.sleep(100);
waited += 100;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
break;
}
}
} finally {
try {
groupMonitor.close();
} catch (IOException e) {
LOG.info("Failed to close:" + e, e);
}
}
}).get();
}
private Iterable<Entry> toEntries(LogEntry... entries) {
return Iterables.transform(Arrays.asList(entries),
entry -> {
try {
byte[] data = Iterables.getFirst(entrySerializer.serialize(entry), null);
return (Entry) () -> data;
} catch (CodingException e) {
throw Throwables.propagate(e);
}
});
}
private static IScheduledTask makeTask(String id, ScheduleStatus status) {
ScheduledTask builder = TaskTestUtil.addStateTransition(
TaskTestUtil.makeTask(id, TaskTestUtil.JOB),
status,
100)
.newBuilder();
builder.getAssignedTask()
.setSlaveId(HOST_ATTRIBUTES.getSlaveId())
.setSlaveHost(HOST_ATTRIBUTES.getHost());
return IScheduledTask.build(builder);
}
@Test
public void testLaunch() throws Exception {
Capture<Scheduler> scheduler = createCapture();
expect(driverFactory.create(
capture(scheduler),
eq(SETTINGS.getCredentials()),
eq(BASE_INFO),
eq(SETTINGS.getMasterUri())))
.andReturn(driver).anyTimes();
IScheduledTask snapshotTask = makeTask("snapshotTask", ScheduleStatus.ASSIGNED);
IScheduledTask transactionTask = makeTask("transactionTask", ScheduleStatus.RUNNING);
Iterable<Entry> recoveredEntries = toEntries(
LogEntry.snapshot(new Snapshot()
.setTasks(ImmutableSet.of(snapshotTask.newBuilder()))
.setHostAttributes(ImmutableSet.of(HOST_ATTRIBUTES.newBuilder()))),
LogEntry.transaction(new Transaction(
ImmutableList.of(Op.saveTasks(
new SaveTasks(ImmutableSet.of(transactionTask.newBuilder())))),
storageConstants.CURRENT_SCHEMA_VERSION)));
expect(log.open()).andReturn(logStream);
expect(logStream.readAll()).andReturn(recoveredEntries.iterator()).anyTimes();
streamMatcher.expectTransaction(Op.saveFrameworkId(new SaveFrameworkId(FRAMEWORK_ID)))
.andReturn(new Position() { });
CountDownLatch driverStarted = new CountDownLatch(1);
expect(driver.start()).andAnswer(() -> {
driverStarted.countDown();
return Protos.Status.DRIVER_RUNNING;
});
// Try to be a good test suite citizen by releasing the blocked thread when the test case exits.
CountDownLatch testCompleted = new CountDownLatch(1);
expect(driver.join()).andAnswer(() -> {
testCompleted.await();
return Protos.Status.DRIVER_STOPPED;
});
addTearDown(testCompleted::countDown);
expect(driver.stop(true)).andReturn(Protos.Status.DRIVER_STOPPED).anyTimes();
control.replay();
Injector injector = startScheduler();
driverStarted.await();
scheduler.getValue().registered(
driver,
Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
MASTER);
awaitSchedulerReady(injector);
assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue());
assertEquals(1L, Stats.<Long>getVariable("task_store_ASSIGNED").read().longValue());
assertEquals(1L, Stats.<Long>getVariable("task_store_RUNNING").read().longValue());
// TODO(William Farner): Send a thrift RPC to the scheduler.
// TODO(William Farner): Also send an admin thrift RPC to verify capability (e.g. ROOT) mapping.
}
}