blob: e37d5665bbc31797cfdfea99f031756426d6ca30 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage.log;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import org.apache.aurora.GuavaUtils.ServiceManagerIface;
import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl;
import org.apache.aurora.common.application.ShutdownStage;
import org.apache.aurora.common.base.Command;
import org.apache.aurora.common.inject.Bindings;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.scheduler.SchedulerLifecycle.SchedulerActive;
import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.TierModule;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.config.types.TimeAmount;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.log.Log;
import org.apache.aurora.scheduler.log.Log.Entry;
import org.apache.aurora.scheduler.log.Log.Position;
import org.apache.aurora.scheduler.log.Log.Stream;
import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Snapshotter;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.Storage.Volatile;
import org.apache.aurora.scheduler.storage.durability.DurableStorageModule;
import org.apache.aurora.scheduler.storage.log.SnapshotModule.Options;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.easymock.IAnswer;
import org.junit.Test;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
public class SnapshotServiceTest extends EasyMockTest {
private static final Snapshot SNAPSHOT = new Snapshot().setTasks(
ImmutableSet.of(TaskTestUtil.makeTask("a", TaskTestUtil.JOB).newBuilder()));
private NonVolatileStorage storage;
private SnapshotStore snapshotStore;
private ServiceManagerIface serviceManager;
private Snapshotter mockSnapshotter;
private Log mockLog;
private Stream mockStream;
private Position mockPosition;
private void setUp(Amount<Long, Time> snapshotInterval) {
mockSnapshotter = createMock(Snapshotter.class);
mockLog = createMock(Log.class);
mockStream = createMock(Stream.class);
mockPosition = createMock(Position.class);
Options options = new Options();
options.snapshotInterval =
new TimeAmount(snapshotInterval.getValue(), snapshotInterval.getUnit());
Injector injector = Guice.createInjector(
new SchedulerServicesModule(),
new LogPersistenceModule(new LogPersistenceModule.Options()),
new SnapshotModule(options),
new DurableStorageModule(),
new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
new TierModule(TaskTestUtil.TIER_CONFIG),
new AbstractModule() {
@Override
protected void configure() {
bind(Key.get(Command.class, ShutdownStage.class)).to(ShutdownRegistryImpl.class);
bind(StatsProvider.class).toInstance(new FakeStatsProvider());
bind(EventSink.class).toInstance(e -> { });
bind(Snapshotter.class).toInstance(mockSnapshotter);
bind(Log.class).toInstance(mockLog);
}
}
);
storage = injector.getInstance(NonVolatileStorage.class);
snapshotStore = injector.getInstance(SnapshotStore.class);
serviceManager =
injector.getInstance(Key.get(ServiceManagerIface.class, SchedulerActive.class));
}
private void expectStorageInitialized() throws Exception {
expect(mockLog.open()).andReturn(mockStream);
List<Entry> empty = ImmutableList.of();
expect(mockStream.readAll()).andReturn(empty.iterator());
}
private void expectSnapshotPersist(CountDownLatch latch) {
expect(mockStream.append(anyObject())).andReturn(mockPosition).atLeastOnce();
mockStream.truncateBefore(mockPosition);
expectLastCall().andAnswer((IAnswer<Void>) () -> {
latch.countDown();
return null;
}).atLeastOnce();
}
@Test
public void testPeriodicSnapshots() throws Exception {
setUp(Amount.of(1L, Time.MILLISECONDS));
expectStorageInitialized();
expect(mockSnapshotter.from(anyObject())).andReturn(SNAPSHOT).atLeastOnce();
CountDownLatch snapshotCalled = new CountDownLatch(2);
expectSnapshotPersist(snapshotCalled);
control.replay();
storage.prepare();
storage.start(stores -> { });
serviceManager.startAsync().awaitHealthy();
snapshotCalled.await();
serviceManager.stopAsync().awaitStopped(10, TimeUnit.SECONDS);
}
@Test
public void testExplicitInternalSnapshot() throws Exception {
setUp(Amount.of(1L, Time.HOURS));
expectStorageInitialized();
expect(mockSnapshotter.from(anyObject())).andReturn(SNAPSHOT);
expectSnapshotPersist(new CountDownLatch(1));
control.replay();
storage.prepare();
storage.start(stores -> { });
snapshotStore.snapshot();
}
@Test
public void testExplicitProvidedSnapshot() throws Exception {
setUp(Amount.of(1L, Time.HOURS));
expectStorageInitialized();
expectSnapshotPersist(new CountDownLatch(1));
control.replay();
storage.prepare();
storage.start(stores -> { });
snapshotStore.snapshotWith(SNAPSHOT);
}
}