blob: 25fd315e74a5f15e2262ac624c57825bea75ad5f [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.inject.Inject;
import javax.inject.Qualifier;
import javax.inject.Singleton;
import com.google.inject.Module;
import com.google.inject.PrivateModule;
import org.apache.aurora.common.util.StateMachine;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import static java.util.Objects.requireNonNull;
/**
* A non-volatile storage wrapper that enforces method call ordering.
*/
public class CallOrderEnforcingStorage implements NonVolatileStorage {
/**
* Identifies a storage whose call order should be enforced.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.PARAMETER, ElementType.METHOD })
@Qualifier
private @interface EnforceOrderOn { }
private final NonVolatileStorage wrapped;
private final EventSink eventSink;
private enum State {
CONSTRUCTED,
PREPARED,
READY,
STOPPED
}
private final StateMachine<State> stateMachine = StateMachine.<State>builder("storage")
.logTransitions()
.initialState(State.CONSTRUCTED)
.addState(
State.CONSTRUCTED,
State.PREPARED, State.STOPPED)
.addState(
State.PREPARED,
State.READY, State.STOPPED)
.addState(
State.READY,
State.STOPPED)
.addState(
State.STOPPED,
// Allow cycles in STOPPED to prevent throwing and avoid the need for call-site checking.
State.STOPPED)
.build();
@Inject
CallOrderEnforcingStorage(@EnforceOrderOn NonVolatileStorage wrapped, EventSink eventSink) {
this.wrapped = requireNonNull(wrapped);
this.eventSink = requireNonNull(eventSink);
}
private void checkState(State state) throws StorageException {
try {
stateMachine.checkState(state);
} catch (IllegalStateException e) {
throw new TransientStorageException("Storage is not " + state, e);
}
}
@Override
public void prepare() throws StorageException {
checkState(State.CONSTRUCTED);
wrapped.prepare();
stateMachine.transition(State.PREPARED);
}
@Override
public void start(Quiet initializationLogic) throws StorageException {
checkState(State.PREPARED);
wrapped.start(initializationLogic);
stateMachine.transition(State.READY);
wrapped.write((NoResult.Quiet) storeProvider -> {
Iterable<IScheduledTask> tasks = Tasks.LATEST_ACTIVITY.sortedCopy(
storeProvider.getTaskStore().fetchTasks(Query.unscoped()));
for (IScheduledTask task : tasks) {
eventSink.post(TaskStateChange.initialized(task));
}
});
}
@Override
public void stop() {
wrapped.stop();
stateMachine.transition(State.STOPPED);
}
@Override
public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
checkState(State.READY);
return wrapped.read(work);
}
@Override
public <T, E extends Exception> T write(MutateWork<T, E> work)
throws StorageException, E {
checkState(State.READY);
return wrapped.write(work);
}
/**
* Creates a binding module that will wrap a storage class with {@link CallOrderEnforcingStorage},
* exposing the order-enforced storage as {@link Storage} and {@link NonVolatileStorage}.
*
* @param storageClass Non-volatile storage implementation class.
* @return Binding module.
*/
public static Module wrappingModule(final Class<? extends NonVolatileStorage> storageClass) {
return new PrivateModule() {
@Override
protected void configure() {
bind(Storage.class).to(CallOrderEnforcingStorage.class);
bind(NonVolatileStorage.class).to(CallOrderEnforcingStorage.class);
bind(CallOrderEnforcingStorage.class).in(Singleton.class);
bind(NonVolatileStorage.class).annotatedWith(EnforceOrderOn.class).to(storageClass);
expose(Storage.class);
expose(NonVolatileStorage.class);
}
};
}
}