| /** |
| * Copyright 2013 Apache Software Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.aurora.scheduler.thrift; |
| |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import javax.annotation.Nullable; |
| import javax.inject.Inject; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Function; |
| import com.google.common.base.Functions; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Optional; |
| import com.google.common.base.Predicate; |
| import com.google.common.base.Predicates; |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.FluentIterable; |
| import com.google.common.collect.ImmutableMultimap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Multimap; |
| import com.google.common.collect.Multimaps; |
| import com.google.common.collect.Sets; |
| import com.twitter.common.args.Arg; |
| import com.twitter.common.args.CmdLine; |
| import com.twitter.common.base.MorePreconditions; |
| import com.twitter.common.base.Supplier; |
| import com.twitter.common.quantity.Amount; |
| import com.twitter.common.quantity.Time; |
| import com.twitter.common.util.BackoffHelper; |
| |
| import org.apache.aurora.auth.CapabilityValidator; |
| import org.apache.aurora.auth.CapabilityValidator.AuditCheck; |
| import org.apache.aurora.auth.CapabilityValidator.Capability; |
| import org.apache.aurora.auth.SessionValidator.AuthFailedException; |
| import org.apache.aurora.gen.AcquireLockResult; |
| import org.apache.aurora.gen.AddInstancesConfig; |
| import org.apache.aurora.gen.AuroraAdmin; |
| import org.apache.aurora.gen.ConfigRewrite; |
| import org.apache.aurora.gen.DrainHostsResult; |
| import org.apache.aurora.gen.EndMaintenanceResult; |
| import org.apache.aurora.gen.GetJobsResult; |
| import org.apache.aurora.gen.GetQuotaResult; |
| import org.apache.aurora.gen.Hosts; |
| import org.apache.aurora.gen.InstanceConfigRewrite; |
| import org.apache.aurora.gen.InstanceKey; |
| import org.apache.aurora.gen.JobConfigRewrite; |
| import org.apache.aurora.gen.JobConfigValidation; |
| import org.apache.aurora.gen.JobConfiguration; |
| import org.apache.aurora.gen.JobKey; |
| import org.apache.aurora.gen.JobSummary; |
| import org.apache.aurora.gen.JobSummaryResult; |
| import org.apache.aurora.gen.ListBackupsResult; |
| import org.apache.aurora.gen.Lock; |
| import org.apache.aurora.gen.LockKey; |
| import org.apache.aurora.gen.LockValidation; |
| import org.apache.aurora.gen.MaintenanceStatusResult; |
| import org.apache.aurora.gen.PopulateJobResult; |
| import org.apache.aurora.gen.QueryRecoveryResult; |
| import org.apache.aurora.gen.Quota; |
| import org.apache.aurora.gen.Response; |
| import org.apache.aurora.gen.ResponseCode; |
| import org.apache.aurora.gen.Result; |
| import org.apache.aurora.gen.RewriteConfigsRequest; |
| import org.apache.aurora.gen.ScheduleStatus; |
| import org.apache.aurora.gen.ScheduleStatusResult; |
| import org.apache.aurora.gen.SessionKey; |
| import org.apache.aurora.gen.StartMaintenanceResult; |
| import org.apache.aurora.gen.TaskConfig; |
| import org.apache.aurora.gen.TaskQuery; |
| import org.apache.aurora.scheduler.base.JobKeys; |
| import org.apache.aurora.scheduler.base.Query; |
| import org.apache.aurora.scheduler.base.ScheduleException; |
| import org.apache.aurora.scheduler.base.Tasks; |
| import org.apache.aurora.scheduler.configuration.ConfigurationManager; |
| import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException; |
| import org.apache.aurora.scheduler.configuration.SanitizedConfiguration; |
| import org.apache.aurora.scheduler.quota.Quotas; |
| import org.apache.aurora.scheduler.state.CronJobManager; |
| import org.apache.aurora.scheduler.state.LockManager; |
| import org.apache.aurora.scheduler.state.LockManager.LockException; |
| import org.apache.aurora.scheduler.state.MaintenanceController; |
| import org.apache.aurora.scheduler.state.SchedulerCore; |
| import org.apache.aurora.scheduler.storage.JobStore; |
| import org.apache.aurora.scheduler.storage.Storage; |
| import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; |
| import org.apache.aurora.scheduler.storage.Storage.MutateWork; |
| import org.apache.aurora.scheduler.storage.Storage.StoreProvider; |
| import org.apache.aurora.scheduler.storage.Storage.Work; |
| import org.apache.aurora.scheduler.storage.backup.Recovery; |
| import org.apache.aurora.scheduler.storage.backup.Recovery.RecoveryException; |
| import org.apache.aurora.scheduler.storage.backup.StorageBackup; |
| import org.apache.aurora.scheduler.storage.entities.IAssignedTask; |
| import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; |
| import org.apache.aurora.scheduler.storage.entities.IJobKey; |
| import org.apache.aurora.scheduler.storage.entities.ILock; |
| import org.apache.aurora.scheduler.storage.entities.ILockKey; |
| import org.apache.aurora.scheduler.storage.entities.IQuota; |
| import org.apache.aurora.scheduler.storage.entities.IScheduledTask; |
| import org.apache.aurora.scheduler.storage.entities.ITaskConfig; |
| import org.apache.aurora.scheduler.thrift.auth.DecoratedThrift; |
| import org.apache.aurora.scheduler.thrift.auth.Requires; |
| import org.apache.commons.lang.StringUtils; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.twitter.common.base.MorePreconditions.checkNotBlank; |
| |
| import static org.apache.aurora.auth.SessionValidator.SessionContext; |
| import static org.apache.aurora.gen.ResponseCode.AUTH_FAILED; |
| import static org.apache.aurora.gen.ResponseCode.ERROR; |
| import static org.apache.aurora.gen.ResponseCode.INVALID_REQUEST; |
| import static org.apache.aurora.gen.ResponseCode.LOCK_ERROR; |
| import static org.apache.aurora.gen.ResponseCode.OK; |
| import static org.apache.aurora.gen.apiConstants.CURRENT_API_VERSION; |
| |
| /** |
| * Aurora scheduler thrift server implementation. |
| * <p> |
| * Interfaces between users and the scheduler to access/modify jobs and perform cluster |
| * administration tasks. |
| */ |
| @DecoratedThrift |
| class SchedulerThriftInterface implements AuroraAdmin.Iface { |
| private static final Logger LOG = Logger.getLogger(SchedulerThriftInterface.class.getName()); |
| |
| @CmdLine(name = "kill_task_initial_backoff", |
| help = "Initial backoff delay while waiting for the tasks to transition to KILLED.") |
| private static final Arg<Amount<Long, Time>> KILL_TASK_INITIAL_BACKOFF = |
| Arg.create(Amount.of(1L, Time.SECONDS)); |
| |
| @CmdLine(name = "kill_task_max_backoff", |
| help = "Max backoff delay while waiting for the tasks to transition to KILLED.") |
| private static final Arg<Amount<Long, Time>> KILL_TASK_MAX_BACKOFF = |
| Arg.create(Amount.of(30L, Time.SECONDS)); |
| |
| private static final Function<IScheduledTask, String> GET_ROLE = Functions.compose( |
| new Function<ITaskConfig, String>() { |
| @Override public String apply(ITaskConfig task) { |
| return task.getOwner().getRole(); |
| } |
| }, |
| Tasks.SCHEDULED_TO_INFO); |
| |
| private final Storage storage; |
| private final SchedulerCore schedulerCore; |
| private final LockManager lockManager; |
| private final CapabilityValidator sessionValidator; |
| private final StorageBackup backup; |
| private final Recovery recovery; |
| private final MaintenanceController maintenance; |
| private final CronJobManager cronJobManager; |
| private final Amount<Long, Time> killTaskInitialBackoff; |
| private final Amount<Long, Time> killTaskMaxBackoff; |
| |
| @Inject |
| SchedulerThriftInterface( |
| Storage storage, |
| SchedulerCore schedulerCore, |
| LockManager lockManager, |
| CapabilityValidator sessionValidator, |
| StorageBackup backup, |
| Recovery recovery, |
| CronJobManager cronJobManager, |
| MaintenanceController maintenance) { |
| |
| this(storage, |
| schedulerCore, |
| lockManager, |
| sessionValidator, |
| backup, |
| recovery, |
| maintenance, |
| cronJobManager, |
| KILL_TASK_INITIAL_BACKOFF.get(), |
| KILL_TASK_MAX_BACKOFF.get()); |
| } |
| |
| @VisibleForTesting |
| SchedulerThriftInterface( |
| Storage storage, |
| SchedulerCore schedulerCore, |
| LockManager lockManager, |
| CapabilityValidator sessionValidator, |
| StorageBackup backup, |
| Recovery recovery, |
| MaintenanceController maintenance, |
| CronJobManager cronJobManager, |
| Amount<Long, Time> initialBackoff, |
| Amount<Long, Time> maxBackoff) { |
| |
| this.storage = checkNotNull(storage); |
| this.schedulerCore = checkNotNull(schedulerCore); |
| this.lockManager = checkNotNull(lockManager); |
| this.sessionValidator = checkNotNull(sessionValidator); |
| this.backup = checkNotNull(backup); |
| this.recovery = checkNotNull(recovery); |
| this.maintenance = checkNotNull(maintenance); |
| this.cronJobManager = checkNotNull(cronJobManager); |
| this.killTaskInitialBackoff = checkNotNull(initialBackoff); |
| this.killTaskMaxBackoff = checkNotNull(maxBackoff); |
| } |
| |
| @Override |
| public Response createJob( |
| JobConfiguration mutableJob, |
| @Nullable Lock mutableLock, |
| SessionKey session) { |
| |
| IJobConfiguration job = IJobConfiguration.build(mutableJob); |
| IJobKey jobKey = JobKeys.assertValid(job.getKey()); |
| checkNotNull(session); |
| |
| Response response = new Response(); |
| |
| try { |
| sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole())); |
| } catch (AuthFailedException e) { |
| return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage()); |
| } |
| |
| try { |
| SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job); |
| |
| lockManager.validateIfLocked( |
| ILockKey.build(LockKey.job(jobKey.newBuilder())), |
| Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER)); |
| |
| schedulerCore.createJob(sanitized); |
| response.setResponseCode(OK) |
| .setMessage(String.format("%d new tasks pending for job %s", |
| sanitized.getJobConfig().getInstanceCount(), JobKeys.toPath(job))); |
| } catch (LockException e) { |
| response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage()); |
| } catch (TaskDescriptionException | ScheduleException e) { |
| response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage()); |
| } |
| |
| return response; |
| } |
| |
| @Override |
| public Response replaceCronTemplate( |
| JobConfiguration mutableConfig, |
| @Nullable Lock mutableLock, |
| SessionKey session) { |
| |
| checkNotNull(mutableConfig); |
| IJobConfiguration job = IJobConfiguration.build(mutableConfig); |
| IJobKey jobKey = JobKeys.assertValid(job.getKey()); |
| checkNotNull(session); |
| |
| Response response = new Response(); |
| try { |
| sessionValidator.checkAuthenticated(session, ImmutableSet.of(job.getOwner().getRole())); |
| } catch (AuthFailedException e) { |
| return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage()); |
| } |
| |
| try { |
| lockManager.validateIfLocked( |
| ILockKey.build(LockKey.job(jobKey.newBuilder())), |
| Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER)); |
| |
| SanitizedConfiguration sanitized = SanitizedConfiguration.fromUnsanitized(job); |
| |
| if (!cronJobManager.hasJob(jobKey)) { |
| return response.setResponseCode(INVALID_REQUEST).setMessage( |
| "No cron template found for the given key: " + jobKey); |
| } |
| cronJobManager.updateJob(sanitized); |
| return response.setResponseCode(OK).setMessage("Replaced template for: " + jobKey); |
| |
| } catch (LockException e) { |
| return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage()); |
| } catch (TaskDescriptionException | ScheduleException e) { |
| return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage()); |
| } |
| } |
| |
| @Override |
| public Response populateJobConfig(JobConfiguration description, JobConfigValidation validation) { |
| |
| checkNotNull(description); |
| |
| Response response = new Response(); |
| try { |
| SanitizedConfiguration sanitized = |
| SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(description)); |
| |
| // TODO(maximk): Consider moving job validation logic into a dedicated RPC. MESOS-4476. |
| if (validation != null && validation == JobConfigValidation.RUN_FILTERS) { |
| schedulerCore.validateJobResources(sanitized); |
| } |
| |
| PopulateJobResult result = new PopulateJobResult() |
| .setPopulated(ITaskConfig.toBuildersSet(sanitized.getTaskConfigs().values())); |
| response.setResult(Result.populateJobResult(result)) |
| .setResponseCode(OK) |
| .setMessage("Tasks populated"); |
| } catch (TaskDescriptionException | ScheduleException e) { |
| response.setResponseCode(INVALID_REQUEST) |
| .setMessage("Invalid configuration: " + e.getMessage()); |
| } |
| return response; |
| } |
| |
| @Override |
| public Response startCronJob(JobKey mutableJobKey, SessionKey session) { |
| checkNotNull(session); |
| IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey)); |
| |
| Response response = new Response(); |
| try { |
| sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole())); |
| } catch (AuthFailedException e) { |
| response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage()); |
| return response; |
| } |
| |
| try { |
| schedulerCore.startCronJob(jobKey); |
| response.setResponseCode(OK).setMessage("Cron run started."); |
| } catch (ScheduleException e) { |
| response.setResponseCode(INVALID_REQUEST) |
| .setMessage("Failed to start cron job - " + e.getMessage()); |
| } catch (TaskDescriptionException e) { |
| response.setResponseCode(ERROR).setMessage("Invalid task description: " + e.getMessage()); |
| } |
| |
| return response; |
| } |
| |
| // TODO(William Farner): Provide status information about cron jobs here. |
| @Override |
| public Response getTasksStatus(TaskQuery query) { |
| checkNotNull(query); |
| |
| Set<IScheduledTask> tasks = |
| Storage.Util.weaklyConsistentFetchTasks(storage, Query.arbitrary(query)); |
| |
| Response response = new Response(); |
| |
| if (tasks.isEmpty()) { |
| response.setResponseCode(INVALID_REQUEST) |
| .setMessage("No tasks found for query: " + query); |
| } else { |
| response.setResponseCode(OK) |
| .setResult(Result.scheduleStatusResult( |
| new ScheduleStatusResult().setTasks(IScheduledTask.toBuildersList(tasks)))); |
| } |
| |
| return response; |
| } |
| |
| @Override |
| public Response getJobSummary() { |
| Set<IScheduledTask> tasks = Storage.Util.weaklyConsistentFetchTasks(storage, Query.unscoped()); |
| Multimap<String, IJobKey> jobsByRole = Multimaps.index( |
| FluentIterable.from(tasks).transform(Tasks.SCHEDULED_TO_JOB_KEY), |
| JobKeys.TO_ROLE); |
| |
| Multimap<String, IJobKey> cronJobsByRole = Multimaps.index( |
| FluentIterable.from(cronJobManager.getJobs()).transform(JobKeys.FROM_CONFIG), |
| JobKeys.TO_ROLE); |
| |
| List<JobSummary> jobSummaries = Lists.newLinkedList(); |
| for (String role : Sets.union(jobsByRole.keySet(), cronJobsByRole.keySet())) { |
| JobSummary summary = new JobSummary(); |
| summary.setRole(role); |
| summary.setJobCount(jobsByRole.get(role).size()); |
| summary.setCronJobCount(cronJobsByRole.get(role).size()); |
| jobSummaries.add(summary); |
| } |
| |
| return new Response() |
| .setResponseCode(OK) |
| .setResult(Result.jobSummaryResult(new JobSummaryResult(jobSummaries))); |
| } |
| |
| @Override |
| public Response getJobs(@Nullable String maybeNullRole) { |
| Optional<String> ownerRole = Optional.fromNullable(maybeNullRole); |
| |
| |
| // Ensure we only return one JobConfiguration for each JobKey. |
| Map<IJobKey, IJobConfiguration> jobs = Maps.newHashMap(); |
| |
| // Query the task store, find immediate jobs, and synthesize a JobConfiguration for them. |
| // This is necessary because the ImmediateJobManager doesn't store jobs directly and |
| // ImmediateJobManager#getJobs always returns an empty Collection. |
| Query.Builder scope = ownerRole.isPresent() |
| ? Query.roleScoped(ownerRole.get()) |
| : Query.unscoped(); |
| Multimap<IJobKey, IScheduledTask> tasks = |
| Tasks.byJobKey(Storage.Util.weaklyConsistentFetchTasks(storage, scope.active())); |
| |
| jobs.putAll(Maps.transformEntries(tasks.asMap(), |
| new Maps.EntryTransformer<IJobKey, Collection<IScheduledTask>, IJobConfiguration>() { |
| @Override |
| public IJobConfiguration transformEntry( |
| IJobKey jobKey, |
| Collection<IScheduledTask> tasks) { |
| |
| // Pick an arbitrary task for each immediate job. The chosen task might not be the most |
| // recent if the job is in the middle of an update or some shards have been selectively |
| // created. |
| TaskConfig firstTask = tasks.iterator().next().getAssignedTask().getTask().newBuilder(); |
| return IJobConfiguration.build(new JobConfiguration() |
| .setKey(jobKey.newBuilder()) |
| .setOwner(firstTask.getOwner()) |
| .setTaskConfig(firstTask) |
| .setInstanceCount(tasks.size())); |
| } |
| })); |
| |
| // Get cron jobs directly from the manager. Do this after querying the task store so the real |
| // template JobConfiguration for a cron job will overwrite the synthesized one that could have |
| // been created above. |
| Predicate<IJobConfiguration> configFilter = ownerRole.isPresent() |
| ? Predicates.compose(Predicates.equalTo(ownerRole.get()), JobKeys.CONFIG_TO_ROLE) |
| : Predicates.<IJobConfiguration>alwaysTrue(); |
| jobs.putAll(Maps.uniqueIndex( |
| FluentIterable.from(cronJobManager.getJobs()).filter(configFilter), |
| JobKeys.FROM_CONFIG)); |
| |
| return new Response() |
| .setResponseCode(OK) |
| .setResult(Result.getJobsResult(new GetJobsResult() |
| .setConfigs(IJobConfiguration.toBuildersSet(jobs.values())))); |
| } |
| |
| private void validateLockForTasks(Optional<ILock> lock, Iterable<IScheduledTask> tasks) |
| throws LockException { |
| |
| ImmutableSet<IJobKey> uniqueKeys = FluentIterable.from(tasks) |
| .transform(Tasks.SCHEDULED_TO_JOB_KEY) |
| .toSet(); |
| |
| // Validate lock against every unique job key derived from the tasks. |
| for (IJobKey key : uniqueKeys) { |
| lockManager.validateIfLocked(ILockKey.build(LockKey.job(key.newBuilder())), lock); |
| } |
| } |
| |
| private SessionContext validateSessionKeyForTasks( |
| SessionKey session, |
| TaskQuery taskQuery, |
| Iterable<IScheduledTask> tasks) throws AuthFailedException { |
| |
| // Authenticate the session against any affected roles, always including the role for a |
| // role-scoped query. This papers over the implementation detail that dormant cron jobs are |
| // authenticated this way. |
| ImmutableSet.Builder<String> targetRoles = ImmutableSet.<String>builder() |
| .addAll(FluentIterable.from(tasks).transform(GET_ROLE)); |
| if (taskQuery.isSetOwner()) { |
| targetRoles.add(taskQuery.getOwner().getRole()); |
| } |
| return sessionValidator.checkAuthenticated(session, targetRoles.build()); |
| } |
| |
| private Optional<SessionContext> isAdmin(SessionKey session) { |
| try { |
| return Optional.of( |
| sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED)); |
| } catch (AuthFailedException e) { |
| return Optional.absent(); |
| } |
| } |
| |
| @Override |
| public Response killTasks(final TaskQuery query, Lock mutablelock, SessionKey session) { |
| checkNotNull(query); |
| checkNotNull(session); |
| |
| Response response = new Response(); |
| |
| if (query.getJobName() != null && StringUtils.isBlank(query.getJobName())) { |
| response.setResponseCode(INVALID_REQUEST).setMessage( |
| String.format("Invalid job name: '%s'", query.getJobName())); |
| return response; |
| } |
| |
| Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, Query.arbitrary(query)); |
| |
| Optional<SessionContext> context = isAdmin(session); |
| if (context.isPresent()) { |
| LOG.info("Granting kill query to admin user: " + query); |
| } else { |
| try { |
| context = Optional.of(validateSessionKeyForTasks(session, query, tasks)); |
| } catch (AuthFailedException e) { |
| response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage()); |
| return response; |
| } |
| } |
| |
| try { |
| validateLockForTasks(Optional.fromNullable(mutablelock).transform(ILock.FROM_BUILDER), tasks); |
| schedulerCore.killTasks(Query.arbitrary(query), context.get().getIdentity()); |
| } catch (LockException e) { |
| return response.setResponseCode(LOCK_ERROR).setMessage(e.getMessage()); |
| } catch (ScheduleException e) { |
| return response.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage()); |
| } |
| |
| // TODO(William Farner): Move this into the client. |
| BackoffHelper backoff = new BackoffHelper(killTaskInitialBackoff, killTaskMaxBackoff, true); |
| final Query.Builder activeQuery = Query.arbitrary(query.setStatuses(Tasks.ACTIVE_STATES)); |
| try { |
| backoff.doUntilSuccess(new Supplier<Boolean>() { |
| @Override public Boolean get() { |
| Set<IScheduledTask> tasks = Storage.Util.consistentFetchTasks(storage, activeQuery); |
| if (tasks.isEmpty()) { |
| LOG.info("Tasks all killed, done waiting."); |
| return true; |
| } else { |
| LOG.info("Jobs not yet killed, waiting..."); |
| return false; |
| } |
| } |
| }); |
| response.setResponseCode(OK).setMessage("Tasks killed."); |
| } catch (InterruptedException e) { |
| LOG.warning("Interrupted while trying to kill tasks: " + e); |
| Thread.currentThread().interrupt(); |
| response.setResponseCode(ERROR).setMessage("killTasks thread was interrupted."); |
| } catch (BackoffHelper.BackoffStoppedException e) { |
| response.setResponseCode(ERROR).setMessage("Tasks were not killed in time."); |
| } |
| return response; |
| } |
| |
| @Override |
| public Response restartShards( |
| JobKey mutableJobKey, |
| Set<Integer> shardIds, |
| Lock mutableLock, |
| SessionKey session) { |
| |
| IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey)); |
| MorePreconditions.checkNotBlank(shardIds); |
| checkNotNull(session); |
| |
| Response response = new Response(); |
| SessionContext context; |
| try { |
| context = sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole())); |
| } catch (AuthFailedException e) { |
| response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage()); |
| return response; |
| } |
| |
| try { |
| lockManager.validateIfLocked( |
| ILockKey.build(LockKey.job(jobKey.newBuilder())), |
| Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER)); |
| schedulerCore.restartShards(jobKey, shardIds, context.getIdentity()); |
| response.setResponseCode(OK).setMessage("Shards are restarting."); |
| } catch (LockException e) { |
| response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage()); |
| } catch (ScheduleException e) { |
| response.setResponseCode(ResponseCode.INVALID_REQUEST).setMessage(e.getMessage()); |
| } |
| |
| return response; |
| } |
| |
| @Override |
| public Response getQuota(final String ownerRole) { |
| checkNotBlank(ownerRole); |
| |
| IQuota quota = storage.consistentRead(new Work.Quiet<IQuota>() { |
| @Override public IQuota apply(StoreProvider storeProvider) { |
| return storeProvider.getQuotaStore().fetchQuota(ownerRole).or(Quotas.noQuota()); |
| } |
| }); |
| |
| return new Response() |
| .setResponseCode(OK) |
| .setResult(Result.getQuotaResult(new GetQuotaResult() |
| .setQuota(quota.newBuilder()))); |
| } |
| |
| @Override |
| public Response startMaintenance(Hosts hosts, SessionKey session) { |
| return new Response() |
| .setResponseCode(OK) |
| .setResult(Result.startMaintenanceResult(new StartMaintenanceResult() |
| .setStatuses(maintenance.startMaintenance(hosts.getHostNames())))); |
| } |
| |
| @Override |
| public Response drainHosts(Hosts hosts, SessionKey session) { |
| return new Response() |
| .setResponseCode(OK) |
| .setResult(Result.drainHostsResult(new DrainHostsResult() |
| .setStatuses(maintenance.drain(hosts.getHostNames())))); |
| } |
| |
| @Override |
| public Response maintenanceStatus(Hosts hosts, SessionKey session) { |
| return new Response() |
| .setResponseCode(OK) |
| .setResult(Result.maintenanceStatusResult(new MaintenanceStatusResult() |
| .setStatuses(maintenance.getStatus(hosts.getHostNames())))); |
| } |
| |
| @Override |
| public Response endMaintenance(Hosts hosts, SessionKey session) { |
| return new Response() |
| .setResponseCode(OK) |
| .setResult(Result.endMaintenanceResult(new EndMaintenanceResult() |
| .setStatuses(maintenance.endMaintenance(hosts.getHostNames())))); |
| } |
| |
| @Requires(whitelist = Capability.PROVISIONER) |
| @Override |
| public Response setQuota(final String ownerRole, final Quota quota, SessionKey session) { |
| checkNotBlank(ownerRole); |
| checkNotNull(quota); |
| checkNotNull(session); |
| |
| // TODO(Kevin Sweeney): Input validation for Quota. |
| |
| storage.write(new MutateWork.NoResult.Quiet() { |
| @Override protected void execute(MutableStoreProvider storeProvider) { |
| storeProvider.getQuotaStore().saveQuota(ownerRole, IQuota.build(quota)); |
| } |
| }); |
| |
| return new Response().setResponseCode(OK).setMessage("Quota applied."); |
| } |
| |
| @Override |
| public Response forceTaskState( |
| String taskId, |
| ScheduleStatus status, |
| SessionKey session) { |
| |
| checkNotBlank(taskId); |
| checkNotNull(status); |
| checkNotNull(session); |
| |
| Response response = new Response(); |
| SessionContext context; |
| try { |
| // TODO(Sathya): Remove this after AOP-style session validation passes in a SessionContext. |
| context = sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED); |
| } catch (AuthFailedException e) { |
| response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage()); |
| return response; |
| } |
| |
| schedulerCore.setTaskStatus( |
| Query.taskScoped(taskId), status, transitionMessage(context.getIdentity())); |
| return new Response().setResponseCode(OK).setMessage("Transition attempted."); |
| } |
| |
| @Override |
| public Response performBackup(SessionKey session) { |
| backup.backupNow(); |
| return new Response().setResponseCode(OK); |
| } |
| |
| @Override |
| public Response listBackups(SessionKey session) { |
| return new Response() |
| .setResponseCode(OK) |
| .setResult(Result.listBackupsResult(new ListBackupsResult() |
| .setBackups(recovery.listBackups()))); |
| } |
| |
| @Override |
| public Response stageRecovery(String backupId, SessionKey session) { |
| Response response = new Response().setResponseCode(OK); |
| try { |
| recovery.stage(backupId); |
| } catch (RecoveryException e) { |
| response.setResponseCode(ERROR).setMessage(e.getMessage()); |
| LOG.log(Level.WARNING, "Failed to stage recovery: " + e, e); |
| } |
| |
| return response; |
| } |
| |
| @Override |
| public Response queryRecovery(TaskQuery query, SessionKey session) { |
| Response response = new Response(); |
| try { |
| response.setResponseCode(OK) |
| .setResult(Result.queryRecoveryResult(new QueryRecoveryResult() |
| .setTasks(IScheduledTask.toBuildersSet(recovery.query(Query.arbitrary(query)))))); |
| } catch (RecoveryException e) { |
| response.setResponseCode(ERROR).setMessage(e.getMessage()); |
| LOG.log(Level.WARNING, "Failed to query recovery: " + e, e); |
| } |
| |
| return response; |
| } |
| |
| @Override |
| public Response deleteRecoveryTasks(TaskQuery query, SessionKey session) { |
| Response response = new Response().setResponseCode(OK); |
| try { |
| recovery.deleteTasks(Query.arbitrary(query)); |
| } catch (RecoveryException e) { |
| response.setResponseCode(ERROR).setMessage(e.getMessage()); |
| LOG.log(Level.WARNING, "Failed to delete recovery tasks: " + e, e); |
| } |
| |
| return response; |
| } |
| |
| @Override |
| public Response commitRecovery(SessionKey session) { |
| Response response = new Response().setResponseCode(OK); |
| try { |
| recovery.commit(); |
| } catch (RecoveryException e) { |
| response.setResponseCode(ERROR).setMessage(e.getMessage()); |
| } |
| |
| return response; |
| } |
| |
| @Override |
| public Response unloadRecovery(SessionKey session) { |
| recovery.unload(); |
| return new Response().setResponseCode(OK); |
| } |
| |
| @Override |
| public Response snapshot(SessionKey session) { |
| Response response = new Response(); |
| try { |
| storage.snapshot(); |
| return response.setResponseCode(OK).setMessage("Compaction successful."); |
| } catch (Storage.StorageException e) { |
| LOG.log(Level.WARNING, "Requested snapshot failed.", e); |
| return response.setResponseCode(ERROR).setMessage(e.getMessage()); |
| } |
| } |
| |
| private static Multimap<String, IJobConfiguration> jobsByKey(JobStore jobStore, IJobKey jobKey) { |
| ImmutableMultimap.Builder<String, IJobConfiguration> matches = ImmutableMultimap.builder(); |
| for (String managerId : jobStore.fetchManagerIds()) { |
| for (IJobConfiguration job : jobStore.fetchJobs(managerId)) { |
| if (job.getKey().equals(jobKey)) { |
| matches.put(managerId, job); |
| } |
| } |
| } |
| return matches.build(); |
| } |
| |
| @Override |
| public Response rewriteConfigs( |
| final RewriteConfigsRequest request, |
| SessionKey session) { |
| |
| if (request.getRewriteCommandsSize() == 0) { |
| return new Response() |
| .setResponseCode(ResponseCode.ERROR) |
| .setMessage("No rewrite commands provided."); |
| } |
| |
| return storage.write(new MutateWork.Quiet<Response>() { |
| @Override public Response apply(MutableStoreProvider storeProvider) { |
| List<String> errors = Lists.newArrayList(); |
| |
| for (ConfigRewrite command : request.getRewriteCommands()) { |
| Optional<String> error = rewriteConfig(command, storeProvider); |
| if (error.isPresent()) { |
| errors.add(error.get()); |
| } |
| } |
| |
| Response resp = new Response(); |
| if (!errors.isEmpty()) { |
| resp.setResponseCode(ResponseCode.WARNING).setMessage(Joiner.on(", ").join(errors)); |
| } else { |
| resp.setResponseCode(OK).setMessage("All rewrites completed successfully."); |
| } |
| return resp; |
| } |
| }); |
| } |
| |
| private Optional<String> rewriteConfig( |
| ConfigRewrite command, |
| MutableStoreProvider storeProvider) { |
| |
| Optional<String> error = Optional.absent(); |
| switch (command.getSetField()) { |
| case JOB_REWRITE: |
| JobConfigRewrite jobRewrite = command.getJobRewrite(); |
| IJobConfiguration existingJob = IJobConfiguration.build(jobRewrite.getOldJob()); |
| IJobConfiguration rewrittenJob; |
| try { |
| rewrittenJob = ConfigurationManager.validateAndPopulate( |
| IJobConfiguration.build(jobRewrite.getRewrittenJob())); |
| } catch (TaskDescriptionException e) { |
| // We could add an error here, but this is probably a hint of something wrong in |
| // the client that's causing a bad configuration to be applied. |
| throw Throwables.propagate(e); |
| } |
| if (!existingJob.getKey().equals(rewrittenJob.getKey())) { |
| error = Optional.of("Disallowing rewrite attempting to change job key."); |
| } else if (!existingJob.getOwner().equals(rewrittenJob.getOwner())) { |
| error = Optional.of("Disallowing rewrite attempting to change job owner."); |
| } else { |
| JobStore.Mutable jobStore = storeProvider.getJobStore(); |
| Multimap<String, IJobConfiguration> matches = |
| jobsByKey(jobStore, existingJob.getKey()); |
| switch (matches.size()) { |
| case 0: |
| error = Optional.of("No jobs found for key " + JobKeys.toPath(existingJob)); |
| break; |
| |
| case 1: |
| Map.Entry<String, IJobConfiguration> match = |
| Iterables.getOnlyElement(matches.entries()); |
| IJobConfiguration storedJob = match.getValue(); |
| if (!storedJob.equals(existingJob)) { |
| error = Optional.of("CAS compare failed for " + JobKeys.toPath(storedJob)); |
| } else { |
| jobStore.saveAcceptedJob(match.getKey(), rewrittenJob); |
| } |
| break; |
| |
| default: |
| error = Optional.of("Multiple jobs found for key " + JobKeys.toPath(existingJob)); |
| } |
| } |
| break; |
| |
| case INSTANCE_REWRITE: |
| InstanceConfigRewrite instanceRewrite = command.getInstanceRewrite(); |
| InstanceKey instanceKey = instanceRewrite.getInstanceKey(); |
| Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks( |
| Query.instanceScoped(IJobKey.build(instanceKey.getJobKey()), |
| instanceKey.getInstanceId()) |
| .active()); |
| Optional<IAssignedTask> task = |
| Optional.fromNullable(Iterables.getOnlyElement(tasks, null)) |
| .transform(Tasks.SCHEDULED_TO_ASSIGNED); |
| if (!task.isPresent()) { |
| error = Optional.of("No active task found for " + instanceKey); |
| } else if (!task.get().getTask().newBuilder().equals(instanceRewrite.getOldTask())) { |
| error = Optional.of("CAS compare failed for " + instanceKey); |
| } else { |
| ITaskConfig newConfiguration = ITaskConfig.build( |
| ConfigurationManager.applyDefaultsIfUnset(instanceRewrite.getRewrittenTask())); |
| boolean changed = storeProvider.getUnsafeTaskStore().unsafeModifyInPlace( |
| task.get().getTaskId(), newConfiguration); |
| if (!changed) { |
| error = Optional.of("Did not change " + task.get().getTaskId()); |
| } |
| } |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unhandled command type " + command.getSetField()); |
| } |
| |
| return error; |
| } |
| |
| @Override |
| public Response getVersion() { |
| return new Response() |
| .setResponseCode(OK) |
| .setResult(Result.getVersionResult(CURRENT_API_VERSION)); |
| } |
| |
| @Override |
| public Response addInstances( |
| AddInstancesConfig config, |
| @Nullable Lock mutableLock, |
| SessionKey session) { |
| |
| checkNotNull(config); |
| checkNotNull(session); |
| checkNotBlank(config.getInstanceIds()); |
| IJobKey jobKey = JobKeys.assertValid(IJobKey.build(config.getKey())); |
| |
| Response resp = new Response(); |
| try { |
| sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole())); |
| ITaskConfig task = ConfigurationManager.validateAndPopulate( |
| ITaskConfig.build(config.getTaskConfig())); |
| |
| if (cronJobManager.hasJob(jobKey)) { |
| return resp.setResponseCode(INVALID_REQUEST) |
| .setMessage("Cron jobs are not supported here."); |
| } |
| |
| lockManager.validateIfLocked( |
| ILockKey.build(LockKey.job(jobKey.newBuilder())), |
| Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER)); |
| |
| schedulerCore.addInstances(jobKey, ImmutableSet.copyOf(config.getInstanceIds()), task); |
| return resp.setResponseCode(OK).setMessage("Successfully added instances."); |
| } catch (AuthFailedException e) { |
| return resp.setResponseCode(AUTH_FAILED).setMessage(e.getMessage()); |
| } catch (LockException e) { |
| return resp.setResponseCode(LOCK_ERROR).setMessage(e.getMessage()); |
| } catch (TaskDescriptionException | ScheduleException e) { |
| return resp.setResponseCode(INVALID_REQUEST).setMessage(e.getMessage()); |
| } |
| } |
| |
| private String getRoleFromLockKey(ILockKey lockKey) { |
| switch (lockKey.getSetField()) { |
| case JOB: |
| JobKeys.assertValid(lockKey.getJob()); |
| return lockKey.getJob().getRole(); |
| default: |
| throw new IllegalArgumentException("Unhandled LockKey: " + lockKey.getSetField()); |
| } |
| } |
| |
| @Override |
| public Response acquireLock(LockKey mutableLockKey, SessionKey session) { |
| checkNotNull(mutableLockKey); |
| checkNotNull(session); |
| |
| ILockKey lockKey = ILockKey.build(mutableLockKey); |
| Response response = new Response(); |
| |
| try { |
| SessionContext context = sessionValidator.checkAuthenticated( |
| session, |
| ImmutableSet.of(getRoleFromLockKey(lockKey))); |
| |
| ILock lock = lockManager.acquireLock(lockKey, context.getIdentity()); |
| response.setResult(Result.acquireLockResult( |
| new AcquireLockResult().setLock(lock.newBuilder()))); |
| |
| return response.setResponseCode(OK).setMessage("Lock has been acquired."); |
| } catch (AuthFailedException e) { |
| return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage()); |
| } catch (LockException e) { |
| return response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage()); |
| } |
| } |
| |
| @Override |
| public Response releaseLock(Lock mutableLock, LockValidation validation, SessionKey session) { |
| checkNotNull(mutableLock); |
| checkNotNull(validation); |
| checkNotNull(session); |
| |
| Response response = new Response(); |
| ILock lock = ILock.build(mutableLock); |
| |
| try { |
| sessionValidator.checkAuthenticated( |
| session, |
| ImmutableSet.of(getRoleFromLockKey(lock.getKey()))); |
| |
| if (validation == LockValidation.CHECKED) { |
| lockManager.validateIfLocked(lock.getKey(), Optional.of(lock)); |
| } |
| lockManager.releaseLock(lock); |
| return response.setResponseCode(OK).setMessage("Lock has been released."); |
| } catch (AuthFailedException e) { |
| return response.setResponseCode(AUTH_FAILED).setMessage(e.getMessage()); |
| } catch (LockException e) { |
| return response.setResponseCode(ResponseCode.LOCK_ERROR).setMessage(e.getMessage()); |
| } |
| } |
| |
| @VisibleForTesting |
| static Optional<String> transitionMessage(String user) { |
| return Optional.of("Transition forced by " + user); |
| } |
| } |