blob: 4e269b68d6eca74b52e9ae3978df4721a93f8573 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Optional;
import javax.inject.Qualifier;
import org.apache.aurora.scheduler.base.Query.Builder;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
/**
* Manages scheduler storage operations providing an interface to perform atomic changes.
*/
public interface Storage {
/**
* Provider for read-only stores. Store implementations must be thread-safe, and should support
* concurrent reads where appropriate.
*/
interface StoreProvider {
SchedulerStore getSchedulerStore();
CronJobStore getCronJobStore();
TaskStore getTaskStore();
QuotaStore getQuotaStore();
AttributeStore getAttributeStore();
JobUpdateStore getJobUpdateStore();
HostMaintenanceStore getHostMaintenanceStore();
}
/**
* Provider for stores that permit mutations. Store implementations need not support concurrent
* writes, as a global reentrant write lock is used to serialize write operations.
*/
interface MutableStoreProvider extends StoreProvider {
SchedulerStore.Mutable getSchedulerStore();
CronJobStore.Mutable getCronJobStore();
/**
* Gets access to the mutable task store.
* <p>
* This is labeled as unsafe, since it's rare that a caller should be using this. In most
* cases, mutations to the task store should be done through
* {@link org.apache.aurora.scheduler.state.StateManager}.
* <p>
* TODO(William Farner): Come up with a way to restrict access to this interface. As it stands,
* it's trivial for an unsuspecting caller to modify the task store directly and subvert the
* state machine and side effect systems.
*
* @return The mutable task store.
*/
TaskStore.Mutable getUnsafeTaskStore();
QuotaStore.Mutable getQuotaStore();
AttributeStore.Mutable getAttributeStore();
JobUpdateStore.Mutable getJobUpdateStore();
HostMaintenanceStore.Mutable getHostMaintenanceStore();
}
/**
* A unit of work to perform against the storage system.
*
* @param <S> Type of stores the work needs access to.
* @param <T> Return type of the operation.
* @param <E> Exception type thrown by the operation.
*/
@FunctionalInterface
interface StorageOperation<S extends StoreProvider, T, E extends Exception> {
/**
* Abstracts a unit of work that has a result, but may also throw a specific exception.
*
* @param storeProvider A provider to give access to different available stores.
* @return the result of the successfully completed unit of work
* @throws E if the unit of work could not be completed
*/
T apply(S storeProvider) throws E;
}
/**
* Encapsulates a read-only storage operation.
*
* @param <T> The type of result this unit of work produces.
* @param <E> The type of exception this unit of work can throw.
*/
@FunctionalInterface
interface Work<T, E extends Exception> extends StorageOperation<StoreProvider, T, E> {
/**
* A convenient typedef for Work that throws no checked exceptions - it runs quietly.
*
* @param <T> The type of result this unit of work produces.
*/
@FunctionalInterface
interface Quiet<T> extends Work<T, RuntimeException> {
// typedef
}
}
/**
* Encapsulates a storage operation, which has mutable storage access.
*
* @param <T> The type of result this unit of work produces.
* @param <E> The type of exception this unit of work can throw.
*/
@FunctionalInterface
interface MutateWork<T, E extends Exception>
extends StorageOperation<MutableStoreProvider, T, E> {
/**
* A convenient typedef for Work that throws no checked exceptions - it runs quietly.
*
* @param <T> The type of result this unit of work produces.
*/
@FunctionalInterface
interface Quiet<T> extends MutateWork<T, RuntimeException> {
// typedef
}
/**
* Encapsulates work that returns no result.
*
* @param <E> The type of exception this unit of work can throw.
*/
@FunctionalInterface
interface NoResult<E extends Exception> extends MutateWork<Void, E> {
@Override
default Void apply(MutableStoreProvider storeProvider) throws E {
execute(storeProvider);
return null;
}
/**
* Similar to {@link #apply(MutableStoreProvider)} except that no result is
* returned.
*
* @param storeProvider A provider to give access to different available stores.
* @throws E if the unit of work could not be completed
*/
void execute(MutableStoreProvider storeProvider) throws E;
/**
* A convenient typedef for Work with no result that throws no checked exceptions - it runs
* quitely.
*/
@FunctionalInterface
interface Quiet extends NoResult<RuntimeException> {
// typedef
}
}
}
/**
* Indicates a problem reading from or writing to stable storage.
*/
class StorageException extends SchedulerException {
public StorageException(String message, Throwable cause) {
super(message, cause);
}
public StorageException(String message) {
super(message);
}
}
/**
* Indicates that stable storage is temporarily unavailable.
*/
class TransientStorageException extends StorageException {
public TransientStorageException(String message, Throwable cause) {
super(message, cause);
}
public TransientStorageException(String message) {
super(message);
}
}
/**
* Executes the unit of read-only {@code work}. The consistency model creates the possibility
* for a reader to read uncommitted state from a concurrent writer.
* <p>
* TODO(wfarner): This method no longer needs to exist now that there is no global locking for
* reads. We could instead directly inject the individual stores where they are used, as long
* as the stores have a layer to replicate what is currently done by
* {@link CallOrderEnforcingStorage}.
*
* @param work The unit of work to execute.
* @param <T> The type of result this unit of work produces.
* @param <E> The type of exception this unit of work can throw.
* @return the result when the unit of work completes successfully
* @throws StorageException if there was a problem reading from stable storage.
* @throws E bubbled transparently when the unit of work throws
*/
<T, E extends Exception> T read(Work<T, E> work) throws StorageException, E;
/**
* Executes the unit of mutating {@code work}.
* TODO(wfarner): Add a mechanism by which mutating work can add side-effect operations to be
* performed after completion of the outer-most transaction. As it stands, it's somewhat
* futile to try to achieve this within a transaction, since the local code does not know
* if the current transaction is nested.
*
* @param work The unit of work to execute.
* @param <T> The type of result this unit of work produces.
* @param <E> The type of exception this unit of work can throw.
* @return the result when the unit of work completes successfully
* @throws StorageException if there was a problem reading from or writing to stable storage.
* @throws E bubbled transparently when the unit of work throws
*/
<T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E;
/**
* Requests the underlying storage prepare its data set; ie: initialize schemas, begin syncing
* out of date data, etc. This method should not block.
*
* @throws StorageException if there was a problem preparing storage.
*/
void prepare() throws StorageException;
/**
* A non-volatile storage that has additional methods to control its lifecycle.
*/
interface NonVolatileStorage extends Storage {
/**
* Prepares the underlying storage for serving traffic.
*
* @param initializationLogic work to perform after this storage system is ready but before
* allowing general use of
* {@link #read}.
* @throws StorageException if there was a starting storage.
*/
void start(MutateWork.NoResult.Quiet initializationLogic) throws StorageException;
/**
* Prepares the underlying storage system for clean shutdown.
*/
void stop();
}
/**
* Identifies a storage layer that is in-memory only.
* This generally should only be used when the storage is first starting up, to perform queries
* related to initially load the storage.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.PARAMETER, ElementType.METHOD })
@Qualifier
@interface Volatile { }
/**
* Utility functions for interacting with a Storage instance.
*/
final class Util {
private Util() {
// Utility class.
}
/**
* Fetch tasks matching the query returned by {@code query} from {@code storage} in a
* read operation.
*
* @see #fetchTasks
* @param storage Storage instance to query from.
* @param query Builder of the query to perform.
* @return Tasks returned from the query.
*/
public static Iterable<IScheduledTask> fetchTasks(Storage storage, Builder query) {
return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTasks(query));
}
public static Optional<IScheduledTask> fetchTask(Storage storage, String taskId) {
return storage.read(storeProvider -> storeProvider.getTaskStore().fetchTask(taskId));
}
public static Iterable<IJobConfiguration> fetchCronJobs(Storage storage) {
return storage.read(storeProvider -> storeProvider.getCronJobStore().fetchJobs());
}
}
}