blob: edaec0a62ec82728eea632d9471c9ea5970f40cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.cloud.scheduler.restful;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.statistics.StatisticManager;
import org.apache.shardingsphere.elasticjob.cloud.event.rdb.JobEventRdbConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.event.rdb.JobEventRdbSearch;
import org.apache.shardingsphere.elasticjob.cloud.event.type.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationGsonFactory;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobExecutionType;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.env.BootstrapEnvironment;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.mesos.FacadeService;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.failover.FailoverTaskInfo;
import org.apache.shardingsphere.elasticjob.cloud.statistics.StatisticInterval;
import org.apache.shardingsphere.elasticjob.cloud.statistics.type.job.JobRegisterStatistics;
import org.apache.shardingsphere.elasticjob.cloud.statistics.type.task.TaskResultStatistics;
import org.apache.shardingsphere.elasticjob.cloud.context.TaskContext;
import org.apache.shardingsphere.elasticjob.cloud.event.type.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.cloud.exception.JobSystemException;
import org.apache.shardingsphere.elasticjob.cloud.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.cloud.statistics.type.job.JobExecutionTypeStatistics;
import org.apache.shardingsphere.elasticjob.cloud.statistics.type.job.JobRunningStatistics;
import org.apache.shardingsphere.elasticjob.cloud.statistics.type.job.JobTypeStatistics;
import org.apache.shardingsphere.elasticjob.cloud.statistics.type.task.TaskRunningStatistics;
import org.apache.shardingsphere.elasticjob.cloud.util.json.GsonFactory;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jettison.json.JSONException;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* Cloud job restful api.
*/
@Path("/job")
@Slf4j
public final class CloudJobRestfulApi {
private static CoordinatorRegistryCenter regCenter;
private static JobEventRdbSearch jobEventRdbSearch;
private static ProducerManager producerManager;
private final CloudJobConfigurationService configService;
private final FacadeService facadeService;
private final StatisticManager statisticManager;
public CloudJobRestfulApi() {
Preconditions.checkNotNull(regCenter);
configService = new CloudJobConfigurationService(regCenter);
facadeService = new FacadeService(regCenter);
Optional<JobEventRdbConfiguration> jobEventRdbConfiguration = Optional.absent();
statisticManager = StatisticManager.getInstance(regCenter, jobEventRdbConfiguration);
}
/**
* Init.
*
* @param regCenter registry center
* @param producerManager producer manager
*/
public static void init(final CoordinatorRegistryCenter regCenter, final ProducerManager producerManager) {
CloudJobRestfulApi.regCenter = regCenter;
CloudJobRestfulApi.producerManager = producerManager;
GsonFactory.registerTypeAdapter(CloudJobConfiguration.class, new CloudJobConfigurationGsonFactory.CloudJobConfigurationGsonTypeAdapter());
Optional<JobEventRdbConfiguration> jobEventRdbConfig = BootstrapEnvironment.getInstance().getJobEventRdbConfiguration();
if (jobEventRdbConfig.isPresent()) {
jobEventRdbSearch = new JobEventRdbSearch(jobEventRdbConfig.get().getDataSource());
} else {
jobEventRdbSearch = null;
}
}
/**
* Register cloud job.
*
* @param jobConfig cloud job configuration
*/
@POST
@Path("/register")
@Consumes(MediaType.APPLICATION_JSON)
public void register(final CloudJobConfiguration jobConfig) {
producerManager.register(jobConfig);
}
/**
* Update cloud job.
*
* @param jobConfig cloud job configuration
*/
@PUT
@Path("/update")
@Consumes(MediaType.APPLICATION_JSON)
public void update(final CloudJobConfiguration jobConfig) {
producerManager.update(jobConfig);
}
/**
* Deregister cloud job.
*
* @param jobName job name
*/
@DELETE
@Path("/deregister")
@Consumes(MediaType.APPLICATION_JSON)
public void deregister(final String jobName) {
producerManager.deregister(jobName);
}
/**
* Check whether the cloud job is disabled or not.
*
* @param jobName job name
* @return true is disabled, otherwise not
* @throws JSONException parse json exception
*/
@GET
@Path("/{jobName}/disable")
@Produces(MediaType.APPLICATION_JSON)
public boolean isDisabled(@PathParam("jobName") final String jobName) throws JSONException {
return facadeService.isJobDisabled(jobName);
}
/**
* Enable cloud job.
*
* @param jobName job name
* @throws JSONException parse json exception
*/
@POST
@Path("/{jobName}/enable")
public void enable(@PathParam("jobName") final String jobName) throws JSONException {
Optional<CloudJobConfiguration> configOptional = configService.load(jobName);
if (configOptional.isPresent()) {
facadeService.enableJob(jobName);
producerManager.reschedule(jobName);
}
}
/**
* Disable cloud job.
*
* @param jobName job name
*/
@POST
@Path("/{jobName}/disable")
public void disable(@PathParam("jobName") final String jobName) {
if (configService.load(jobName).isPresent()) {
facadeService.disableJob(jobName);
producerManager.unschedule(jobName);
}
}
/**
* Trigger job once.
*
* @param jobName job name
*/
@POST
@Path("/trigger")
@Consumes(MediaType.APPLICATION_JSON)
public void trigger(final String jobName) {
Optional<CloudJobConfiguration> config = configService.load(jobName);
if (config.isPresent() && CloudJobExecutionType.DAEMON == config.get().getJobExecutionType()) {
throw new JobSystemException("Daemon job '%s' cannot support trigger.", jobName);
}
facadeService.addTransient(jobName);
}
/**
* Query job detail.
*
* @param jobName job name
* @return the job detail
*/
@GET
@Path("/jobs/{jobName}")
@Consumes(MediaType.APPLICATION_JSON)
public Response detail(@PathParam("jobName") final String jobName) {
Optional<CloudJobConfiguration> jobConfig = configService.load(jobName);
if (!jobConfig.isPresent()) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.ok(jobConfig.get()).build();
}
/**
* Find all jobs.
*
* @return all jobs
*/
@GET
@Path("/jobs")
@Consumes(MediaType.APPLICATION_JSON)
public Collection<CloudJobConfiguration> findAllJobs() {
return configService.loadAll();
}
/**
* Find all running tasks.
*
* @return all running tasks
*/
@GET
@Path("tasks/running")
@Consumes(MediaType.APPLICATION_JSON)
public Collection<TaskContext> findAllRunningTasks() {
List<TaskContext> result = new LinkedList<>();
for (Set<TaskContext> each : facadeService.getAllRunningTasks().values()) {
result.addAll(each);
}
return result;
}
/**
* Find all ready tasks.
*
* @return collection of all ready tasks
*/
@GET
@Path("tasks/ready")
@Consumes(MediaType.APPLICATION_JSON)
public Collection<Map<String, String>> findAllReadyTasks() {
Map<String, Integer> readyTasks = facadeService.getAllReadyTasks();
List<Map<String, String>> result = new ArrayList<>(readyTasks.size());
for (Entry<String, Integer> each : readyTasks.entrySet()) {
Map<String, String> oneTask = new HashMap<>(2, 1);
oneTask.put("jobName", each.getKey());
oneTask.put("times", String.valueOf(each.getValue()));
result.add(oneTask);
}
return result;
}
/**
* Find all failover tasks.
*
* @return collection of all the failover tasks
*/
@GET
@Path("tasks/failover")
@Consumes(MediaType.APPLICATION_JSON)
public Collection<FailoverTaskInfo> findAllFailoverTasks() {
List<FailoverTaskInfo> result = new LinkedList<>();
for (Collection<FailoverTaskInfo> each : facadeService.getAllFailoverTasks().values()) {
result.addAll(each);
}
return result;
}
/**
* Find job execution events.
*
* @param info uri info
* @return job execution event
* @throws ParseException parse exception
*/
@GET
@Path("events/executions")
@Consumes(MediaType.APPLICATION_JSON)
public JobEventRdbSearch.Result<JobExecutionEvent> findJobExecutionEvents(@Context final UriInfo info) throws ParseException {
if (!isRdbConfigured()) {
return new JobEventRdbSearch.Result<>(0, Collections.<JobExecutionEvent>emptyList());
}
return jobEventRdbSearch.findJobExecutionEvents(buildCondition(info, new String[]{"jobName", "taskId", "ip", "isSuccess"}));
}
/**
* Find job status trace events.
*
* @param info uri info
* @return job status trace event
* @throws ParseException parse exception
*/
@GET
@Path("events/statusTraces")
@Consumes(MediaType.APPLICATION_JSON)
public JobEventRdbSearch.Result<JobStatusTraceEvent> findJobStatusTraceEvents(@Context final UriInfo info) throws ParseException {
if (!isRdbConfigured()) {
return new JobEventRdbSearch.Result<>(0, Collections.<JobStatusTraceEvent>emptyList());
}
return jobEventRdbSearch.findJobStatusTraceEvents(buildCondition(info, new String[]{"jobName", "taskId", "slaveId", "source", "executionType", "state"}));
}
private boolean isRdbConfigured() {
return null != jobEventRdbSearch;
}
private JobEventRdbSearch.Condition buildCondition(final UriInfo info, final String[] params) throws ParseException {
int perPage = 10;
int page = 1;
if (!Strings.isNullOrEmpty(info.getQueryParameters().getFirst("per_page"))) {
perPage = Integer.parseInt(info.getQueryParameters().getFirst("per_page"));
}
if (!Strings.isNullOrEmpty(info.getQueryParameters().getFirst("page"))) {
page = Integer.parseInt(info.getQueryParameters().getFirst("page"));
}
String sort = info.getQueryParameters().getFirst("sort");
String order = info.getQueryParameters().getFirst("order");
Date startTime = null;
Date endTime = null;
Map<String, Object> fields = getQueryParameters(info, params);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (!Strings.isNullOrEmpty(info.getQueryParameters().getFirst("startTime"))) {
startTime = simpleDateFormat.parse(info.getQueryParameters().getFirst("startTime"));
}
if (!Strings.isNullOrEmpty(info.getQueryParameters().getFirst("endTime"))) {
endTime = simpleDateFormat.parse(info.getQueryParameters().getFirst("endTime"));
}
return new JobEventRdbSearch.Condition(perPage, page, sort, order, startTime, endTime, fields);
}
private Map<String, Object> getQueryParameters(final UriInfo info, final String[] params) {
final Map<String, Object> result = new HashMap<>();
for (String each : params) {
if (!Strings.isNullOrEmpty(info.getQueryParameters().getFirst(each))) {
result.put(each, info.getQueryParameters().getFirst(each));
}
}
return result;
}
/**
* Find task result statistics.
*
* @param since time span
* @return task result statistics
*/
@GET
@Path("/statistics/tasks/results")
@Consumes(MediaType.APPLICATION_JSON)
public List<TaskResultStatistics> findTaskResultStatistics(@QueryParam("since") final String since) {
if ("last24hours".equals(since)) {
return statisticManager.findTaskResultStatisticsDaily();
} else {
return Collections.emptyList();
}
}
/**
* Get task result statistics.
*
* @param period time period
* @return task result statistics
*/
@GET
@Path("/statistics/tasks/results/{period}")
@Consumes(MediaType.APPLICATION_JSON)
public TaskResultStatistics getTaskResultStatistics(@PathParam("period") final String period) {
if ("online".equals(period)) {
return statisticManager.getTaskResultStatisticsSinceOnline();
} else if ("lastWeek".equals(period)) {
return statisticManager.getTaskResultStatisticsWeekly();
} else if ("lastHour".equals(period)) {
return statisticManager.findLatestTaskResultStatistics(StatisticInterval.HOUR);
} else if ("lastMinute".equals(period)) {
return statisticManager.findLatestTaskResultStatistics(StatisticInterval.MINUTE);
} else {
return new TaskResultStatistics(0, 0, StatisticInterval.DAY, new Date());
}
}
/**
* Find task running statistics.
*
* @param since time span
* @return task result statistics
*/
@GET
@Path("/statistics/tasks/running")
@Consumes(MediaType.APPLICATION_JSON)
public List<TaskRunningStatistics> findTaskRunningStatistics(@QueryParam("since") final String since) {
if ("lastWeek".equals(since)) {
return statisticManager.findTaskRunningStatisticsWeekly();
} else {
return Collections.emptyList();
}
}
/**
* Get job type statistics.
*
* @return job type statistics
*/
@GET
@Path("/statistics/jobs/type")
@Consumes(MediaType.APPLICATION_JSON)
public JobTypeStatistics getJobTypeStatistics() {
return statisticManager.getJobTypeStatistics();
}
/**
* Get job execution type statistics.
*
* @return job execution statistics
*/
@GET
@Path("/statistics/jobs/executionType")
@Consumes(MediaType.APPLICATION_JSON)
public JobExecutionTypeStatistics getJobExecutionTypeStatistics() {
return statisticManager.getJobExecutionTypeStatistics();
}
/**
* Find job running statistics in the recent week.
* @param since time span
* @return collection of job running statistics in the recent week
*/
@GET
@Path("/statistics/jobs/running")
@Consumes(MediaType.APPLICATION_JSON)
public List<JobRunningStatistics> findJobRunningStatistics(@QueryParam("since") final String since) {
if ("lastWeek".equals(since)) {
return statisticManager.findJobRunningStatisticsWeekly();
} else {
return Collections.emptyList();
}
}
/**
* Find job register statistics.
*
* @return collection of job register statistics since online
*/
@GET
@Path("/statistics/jobs/register")
@Consumes(MediaType.APPLICATION_JSON)
public List<JobRegisterStatistics> findJobRegisterStatistics() {
return statisticManager.findJobRegisterStatisticsSinceOnline();
}
}