blob: ff99f98540b171f1e7db00d32b569bd3adf51410 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.handler;
import org.apache.log4j.Logger;
import org.apache.sqoop.audit.AuditLoggerManager;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.framework.FrameworkManager;
import org.apache.sqoop.json.JobBean;
import org.apache.sqoop.json.JobValidationBean;
import org.apache.sqoop.json.JsonBean;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MJobForms;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.server.RequestContext;
import org.apache.sqoop.server.RequestHandler;
import org.apache.sqoop.server.common.ServerError;
import org.apache.sqoop.utils.ClassUtils;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
import org.apache.sqoop.validation.Validator;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
/**
* Job request handler is supporting following resources:
*
* GET /v1/job/:jid
* Return details about one particular job with id :jid or about all of
* them if :jid equals to "all".
*
* POST /v1/job
* Create new job
*
* PUT /v1/job/:jid
* Update job with id :jid.
*
* PUT /v1/job/:jid/enable
* Enable job with id :jid
*
* PUT /v1/job/:jid/disable
* Disable job with id :jid
*
* DELETE /v1/job/:jid
* Remove job with id :jid
*
* Planned resources:
*
* GET /v1/job
* Get brief list of all jobs present in the system. This resource is not yet
* implemented.
*/
public class JobRequestHandler implements RequestHandler {
private static final Logger LOG =
Logger.getLogger(JobRequestHandler.class);
private static final String ENABLE = "enable";
private static final String DISABLE = "disable";
public JobRequestHandler() {
LOG.info("JobRequestHandler initialized");
}
@Override
public JsonBean handleEvent(RequestContext ctx) {
switch (ctx.getMethod()) {
case GET:
return getJobs(ctx);
case POST:
return createUpdateJob(ctx, false);
case PUT:
if (ctx.getLastURLElement().equals(ENABLE)) {
return enableJob(ctx, true);
} else if (ctx.getLastURLElement().equals(DISABLE)) {
return enableJob(ctx, false);
} else {
return createUpdateJob(ctx, true);
}
case DELETE:
return deleteJob(ctx);
}
return null;
}
/**
* Delete job from metadata repository.
*
* @param ctx Context object
* @return Empty bean
*/
private JsonBean deleteJob(RequestContext ctx) {
String sxid = ctx.getLastURLElement();
long jid = Long.valueOf(sxid);
AuditLoggerManager.getInstance()
.logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
"delete", "job", sxid);
Repository repository = RepositoryManager.getInstance().getRepository();
repository.deleteJob(jid);
return JsonBean.EMPTY_BEAN;
}
/**
* Update or create job metadata in repository.
*
* @param ctx Context object
* @return Validation bean object
*/
private JsonBean createUpdateJob(RequestContext ctx, boolean update) {
// Check that given ID equals with sent ID, otherwise report an error UPDATE
// String sxid = ctx.getLastURLElement();
// long xid = Long.valueOf(sxid);
String username = ctx.getUserName();
JobBean bean = new JobBean();
try {
JSONObject json =
(JSONObject) JSONValue.parse(ctx.getRequest().getReader());
bean.restore(json);
} catch (IOException e) {
throw new SqoopException(ServerError.SERVER_0003,
"Can't read request content", e);
}
// Get job object
List<MJob> jobs = bean.getJobs();
if(jobs.size() != 1) {
throw new SqoopException(ServerError.SERVER_0003,
"Expected one job metadata but got " + jobs.size());
}
// Job object
MJob job = jobs.get(0);
// Verify that user is not trying to spoof us
MJobForms fromConnectorForms = ConnectorManager.getInstance()
.getConnectorMetadata(job.getConnectorId(Direction.FROM))
.getJobForms(Direction.FROM);
MJobForms toConnectorForms = ConnectorManager.getInstance()
.getConnectorMetadata(job.getConnectorId(Direction.TO))
.getJobForms(Direction.TO);
MJobForms frameworkForms = FrameworkManager.getInstance().getFramework()
.getJobForms();
if(!fromConnectorForms.equals(job.getConnectorPart(Direction.FROM))
|| !frameworkForms.equals(job.getFrameworkPart())
|| !toConnectorForms.equals(job.getConnectorPart(Direction.TO))) {
throw new SqoopException(ServerError.SERVER_0003,
"Detected incorrect form structure");
}
// Responsible connector for this session
SqoopConnector fromConnector =
ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.FROM));
SqoopConnector toConnector =
ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.TO));
if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) {
throw new SqoopException(ServerError.SERVER_0004, "Connector " + fromConnector.getClass().getCanonicalName()
+ " does not support FROM direction.");
}
if (!toConnector.getSupportedDirections().contains(Direction.TO)) {
throw new SqoopException(ServerError.SERVER_0004, "Connector " + toConnector.getClass().getCanonicalName()
+ " does not support TO direction.");
}
// Get validator objects
Validator fromConnectorValidator = fromConnector.getValidator();
Validator frameworkValidator = FrameworkManager.getInstance().getValidator();
Validator toConnectorValidator = toConnector.getValidator();
// We need translate forms to configuration objects
Object fromConnectorConfig = ClassUtils.instantiate(
fromConnector.getJobConfigurationClass(Direction.FROM));
Object frameworkConfig = ClassUtils.instantiate(
FrameworkManager.getInstance().getJobConfigurationClass());
Object toConnectorConfig = ClassUtils.instantiate(
toConnector.getJobConfigurationClass(Direction.TO));
FormUtils.fromForms(job.getConnectorPart(Direction.FROM).getForms(), fromConnectorConfig);
FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkConfig);
FormUtils.fromForms(job.getConnectorPart(Direction.TO).getForms(), toConnectorConfig);
// Validate all parts
Validation fromConnectorValidation =
fromConnectorValidator.validateJob(fromConnectorConfig);
Validation frameworkValidation =
frameworkValidator.validateJob(frameworkConfig);
Validation toConnectorValidation =
toConnectorValidator.validateJob(toConnectorConfig);
Status finalStatus = Status.getWorstStatus(fromConnectorValidation.getStatus(),
frameworkValidation.getStatus(), toConnectorValidation.getStatus());
// Return back validations in all cases
JobValidationBean outputBean =
new JobValidationBean(fromConnectorValidation, frameworkValidation, toConnectorValidation);
// If we're good enough let's perform the action
if(finalStatus.canProceed()) {
if(update) {
AuditLoggerManager.getInstance()
.logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
"update", "job", String.valueOf(job.getPersistenceId()));
job.setLastUpdateUser(username);
RepositoryManager.getInstance().getRepository().updateJob(job);
} else {
job.setCreationUser(username);
job.setLastUpdateUser(username);
RepositoryManager.getInstance().getRepository().createJob(job);
outputBean.setId(job.getPersistenceId());
AuditLoggerManager.getInstance()
.logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
"create", "job", String.valueOf(job.getPersistenceId()));
}
}
return outputBean;
}
private JsonBean getJobs(RequestContext ctx) {
String sjid = ctx.getLastURLElement();
JobBean bean;
AuditLoggerManager.getInstance()
.logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
"get", "job", sjid);
Locale locale = ctx.getAcceptLanguageHeader();
Repository repository = RepositoryManager.getInstance().getRepository();
if (sjid.equals("all")) {
List<MJob> jobs = repository.findJobs();
bean = new JobBean(jobs);
// Add associated resources into the bean
// @TODO(Abe): From/To.
for( MJob job : jobs) {
long connectorId = job.getConnectorId(Direction.FROM);
if(!bean.hasConnectorBundle(connectorId)) {
bean.addConnectorBundle(connectorId,
ConnectorManager.getInstance().getResourceBundle(connectorId, locale));
}
}
} else {
long jid = Long.valueOf(sjid);
MJob job = repository.findJob(jid);
// @TODO(Abe): From/To
long connectorId = job.getConnectorId(Direction.FROM);
bean = new JobBean(job);
bean.addConnectorBundle(connectorId,
ConnectorManager.getInstance().getResourceBundle(connectorId, locale));
}
// Sent framework resource bundle in all cases
bean.setFrameworkBundle(FrameworkManager.getInstance().getBundle(locale));
return bean;
}
private JsonBean enableJob(RequestContext ctx, boolean enabled) {
String[] elements = ctx.getUrlElements();
String sjid = elements[elements.length - 2];
long xid = Long.valueOf(sjid);
Repository repository = RepositoryManager.getInstance().getRepository();
repository.enableJob(xid, enabled);
return JsonBean.EMPTY_BEAN;
}
}