blob: a84e40895612c2b82ccdc7e0244418b610d9e7b2 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage.log;
import java.util.List;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.aurora.codec.ThriftBinaryCodec;
import org.apache.aurora.common.inject.Bindings;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.common.util.BuildInfo;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.common.util.testing.FakeBuildInfo;
import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.gen.storage.LogEntry;
import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.SaveTasks;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.Transaction;
import org.apache.aurora.scheduler.TierModule;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.log.Log;
import org.apache.aurora.scheduler.log.Log.Entry;
import org.apache.aurora.scheduler.log.Log.Stream;
import org.apache.aurora.scheduler.storage.Snapshotter;
import org.apache.aurora.scheduler.storage.Storage.Volatile;
import org.apache.aurora.scheduler.storage.durability.Persistence;
import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
import org.apache.aurora.scheduler.storage.log.LogPersistenceModule.Options;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.junit.Before;
import org.junit.Test;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
public class LogPersistenceTest extends EasyMockTest {
private Persistence persistence;
private Log mockLog;
private Stream mockStream;
@Before
public void setUp() {
mockLog = createMock(Log.class);
mockStream = createMock(Stream.class);
Injector injector = Guice.createInjector(
new LogPersistenceModule(new Options()),
new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
new TierModule(TaskTestUtil.TIER_CONFIG),
new AbstractModule() {
@Override
protected void configure() {
bind(StatsProvider.class).toInstance(new FakeStatsProvider());
bind(EventSink.class).toInstance(e -> { });
bind(BuildInfo.class).toInstance(FakeBuildInfo.generateBuildInfo());
bind(Clock.class).toInstance(new FakeClock());
bind(Snapshotter.class).to(SnapshotterImpl.class);
bind(Log.class).toInstance(mockLog);
}
}
);
persistence = injector.getInstance(Persistence.class);
}
@Test
public void testRecoverEmpty() throws Exception {
expect(mockLog.open()).andReturn(mockStream);
List<Entry> empty = ImmutableList.of();
expect(mockStream.readAll()).andReturn(empty.iterator());
control.replay();
persistence.prepare();
assertEquals(ImmutableList.of(), persistence.recover().collect(Collectors.toList()));
}
@Test
public void testRecoverSnapshot() throws Exception {
expect(mockLog.open()).andReturn(mockStream);
Op saveA = Op.saveTasks(new SaveTasks().setTasks(ImmutableSet.of(
TaskTestUtil.makeTask("a", TaskTestUtil.JOB).newBuilder())));
Op saveB = Op.saveTasks(new SaveTasks().setTasks(ImmutableSet.of(
TaskTestUtil.makeTask("b", TaskTestUtil.JOB).newBuilder())));
Op saveC = Op.saveTasks(new SaveTasks().setTasks(ImmutableSet.of(
TaskTestUtil.makeTask("c", TaskTestUtil.JOB).newBuilder())));
List<Entry> entries = ImmutableList.of(
logEntry(LogEntry.transaction(new Transaction().setOps(ImmutableList.of(saveA)))),
logEntry(LogEntry.snapshot(new Snapshot().setTasks(saveB.getSaveTasks().getTasks()))),
logEntry(LogEntry.transaction(new Transaction().setOps(ImmutableList.of(saveC)))));
expect(mockStream.readAll()).andReturn(entries.iterator());
control.replay();
persistence.prepare();
assertEquals(
ImmutableList.of(
Edit.op(saveA),
Edit.deleteAll(),
Edit.op(saveB),
Edit.op(saveC)),
persistence.recover().collect(Collectors.toList()));
}
private static Entry logEntry(LogEntry entry) {
return () -> ThriftBinaryCodec.encodeNonNull(entry);
}
}