blob: d51c8604c92835e2b758e2c13dbff28e0f6c6883 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.tools.tool;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.apache.sqoop.cli.SqoopGnuParser;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.driver.Driver;
import org.apache.sqoop.driver.DriverUpgrader;
import org.apache.sqoop.json.JSONUtils;
import org.apache.sqoop.json.JobBean;
import org.apache.sqoop.json.LinkBean;
import org.apache.sqoop.json.SubmissionBean;
import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MDriver;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MPersistableEntity;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.model.MValidator;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.tools.ConfiguredTool;
import org.apache.sqoop.utils.ClassUtils;
import org.apache.sqoop.validation.ConfigValidationResult;
import org.apache.sqoop.validation.ConfigValidationRunner;
import org.apache.sqoop.validation.Status;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
/**
* Load user-created content of Sqoop repository from a JSON formatted file The
* loaded connector IDs will be modified to match existing connectors
*/
public class RepositoryLoadTool extends ConfiguredTool {
public static final Logger LOG = Logger.getLogger(RepositoryLoadTool.class);
private boolean isInTest = false;
@SuppressWarnings("static-access")
@Override
public boolean runToolWithConfiguration(String[] arguments) {
Options options = new Options();
options.addOption(OptionBuilder.isRequired().hasArg().withArgName("filename")
.withLongOpt("input").create('i'));
CommandLineParser parser = new SqoopGnuParser();
try {
CommandLine line = parser.parse(options, arguments);
String inputFileName = line.getOptionValue('i');
LOG.info("Reading JSON from file" + inputFileName);
try (InputStream input = new FileInputStream(inputFileName)) {
String jsonTxt = IOUtils.toString(input, Charsets.UTF_8);
JSONObject json = JSONUtils.parse(jsonTxt);
boolean res = load(json);
return res;
}
} catch (FileNotFoundException e) {
LOG.error("Repository dump file not found:", e);
System.out.println("Input file not found. Please check Server logs for details.");
return false;
} catch (IOException e) {
LOG.error("Unable to read repository dump file:", e);
System.out.println("Unable to read input file. Please check Server logs for details.");
return false;
} catch (ParseException e) {
LOG.error("Error parsing command line arguments:", e);
System.out
.println("Error parsing command line arguments. Please check Server logs for details.");
return false;
}
}
private boolean load(JSONObject repo) {
// Validate that loading JSON into repository is supported
JSONObject metadata = (JSONObject) repo.get(JSONConstants.METADATA);
if (metadata == null) {
LOG.error("Malformed JSON. Key " + JSONConstants.METADATA + " not found.");
return false;
}
if (!validateMetadata(metadata)) {
LOG.error("Metadata of repository dump file failed validation (see error above for cause). Aborting repository load.");
return false;
}
if (!isInTest) {
// initialize repository as mutable
RepositoryManager.getInstance().initialize(false);
ConnectorManager.getInstance().initialize();
Driver.getInstance().initialize();
}
Repository repository = RepositoryManager.getInstance().getRepository();
LOG.info("Loading Connections");
JSONObject jsonLinks = (JSONObject) repo.get(JSONConstants.LINKS);
if (jsonLinks == null) {
LOG.error("Malformed JSON file. Key " + JSONConstants.LINKS + " not found.");
return false;
}
removeObjectIfConnectorNotExist(
(JSONArray) jsonLinks.get(JSONConstants.LINKS),
JSONConstants.CONNECTOR_NAME, true);
LinkBean linksBean = new LinkBean();
linksBean.restore(jsonLinks);
for (MLink link : linksBean.getLinks()) {
long newId = loadLink(link);
if (newId == link.PERSISTANCE_ID_DEFAULT) {
LOG.error("loading connection " + link.getName() + " failed. Aborting repository load. Check log for details.");
return false;
}
}
LOG.info("Loaded " + linksBean.getLinks().size() + " links");
LOG.info("Loading Jobs");
JSONObject jsonJobs = (JSONObject) repo.get(JSONConstants.JOBS);
if (jsonJobs == null) {
LOG.error("Malformed JSON file. Key " + JSONConstants.JOBS + " not found.");
return false;
}
removeObjectIfConnectorNotExist(
(JSONArray) jsonJobs.get(JSONConstants.JOBS),
JSONConstants.FROM_CONNECTOR_NAME, false);
removeObjectIfConnectorNotExist(
(JSONArray) jsonJobs.get(JSONConstants.JOBS),
JSONConstants.TO_CONNECTOR_NAME, false);
removeJobIfLinkNotExist((JSONArray) jsonJobs.get(JSONConstants.JOBS),
JSONConstants.FROM_LINK_NAME);
removeJobIfLinkNotExist((JSONArray) jsonJobs.get(JSONConstants.JOBS),
JSONConstants.TO_LINK_NAME);
JobBean jobsBean = new JobBean();
jobsBean.restore(jsonJobs);
for (MJob job : jobsBean.getJobs()) {
long newId = loadJob(job);
if (newId == job.PERSISTANCE_ID_DEFAULT) {
LOG.error("loading job " + job.getName()
+ " failed. Aborting repository load. Check log for details.");
return false;
}
}
LOG.info("Loaded " + jobsBean.getJobs().size() + " jobs");
LOG.info("Loading Submissions");
JSONObject jsonSubmissions = (JSONObject) repo.get(JSONConstants.SUBMISSIONS);
if (jsonSubmissions == null) {
LOG.error("Malformed JSON file. Key " + JSONConstants.SUBMISSIONS + " not found.");
return false;
}
removeSubmissionIfJobNotExist((JSONArray)jsonSubmissions.get(JSONConstants.SUBMISSIONS));
SubmissionBean submissionsBean = new SubmissionBean();
submissionsBean.restore(jsonSubmissions);
for (MSubmission submission : submissionsBean.getSubmissions()) {
resetPersistenceId(submission);
repository.createSubmission(submission);
}
LOG.info("Loaded " + submissionsBean.getSubmissions().size() + " submissions.");
LOG.info("Repository load completed successfully.");
return true;
}
private void resetPersistenceId(MPersistableEntity ent) {
ent.setPersistenceId(ent.PERSISTANCE_ID_DEFAULT);
}
/**
* Even though the metadata contains version, revision, compile-date and
* compile-user We are only validating that version match for now. More
* interesting logic can be added later
*/
private boolean validateMetadata(JSONObject metadata) {
String jsonVersion = (String) metadata.get(JSONConstants.VERSION);
Boolean includeSensitive = (Boolean) metadata.get(JSONConstants.INCLUDE_SENSITIVE);
String repoVersion = VersionInfo.getBuildVersion();
if (!jsonVersion.equals(repoVersion)) {
LOG.error("Repository version in file (" + jsonVersion
+ ") does not match this version of Sqoop (" + repoVersion + ")");
return false;
}
if (!includeSensitive) {
LOG.warn("Loading repository which was dumped without --include-sensitive=true. "
+ "This means some sensitive information such as passwords is not included in the dump file and will need to be manually added later.");
}
return true;
}
/**
* We currently pass through null as the old connector version because we do
* not have a good way of determining what the old version of the connector is
* here.
*
* According to Jarcec, this chunk of code will receive some much needed
* attention in the near future and this will be fixed.
*/
private long loadLink(MLink link) {
// starting by pretending we have a brand new link
resetPersistenceId(link);
Repository repository = RepositoryManager.getInstance().getRepository();
MConnector mConnector = ConnectorManager.getInstance().getConnectorConfigurable(link.getConnectorName());
ConnectorConfigurableUpgrader connectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mConnector.getUniqueName()).getConfigurableUpgrader(null);
List<MConfig> connectorConfigs = mConnector.getLinkConfig().clone(false).getConfigs();
List<MValidator> connectorValidators = mConnector.getLinkConfig().getCloneOfValidators();
MLinkConfig newLinkConfigs = new MLinkConfig(connectorConfigs, connectorValidators);
// upgrading the configs to make sure they match the current repository
connectorConfigUpgrader.upgradeLinkConfig(link.getConnectorLinkConfig(), newLinkConfigs);
MLink newLink = new MLink(link, newLinkConfigs);
// Transform config structures to objects for validations
SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(
link.getConnectorName());
Object connectorConfig = ClassUtils.instantiate(connector.getLinkConfigurationClass());
ConfigUtils.fromConfigs(link.getConnectorLinkConfig().getConfigs(), connectorConfig);
ConfigValidationRunner validationRunner = new ConfigValidationRunner();
ConfigValidationResult result = validationRunner.validate(connectorConfig);
Status finalStatus = Status.getWorstStatus(result.getStatus());
if (finalStatus.canProceed()) {
repository.createLink(newLink);
} else {
LOG.error("Failed to load link:" + link.getName());
LOG.error("Status of connector configs:" + result.getStatus().toString());
}
return newLink.getPersistenceId();
}
/**
* We currently pass through null as the old connector version because we do
* not have a good way of determining what the old version of the connector is
* here.
*
* According to Jarcec, this chunk of code will receive some much needed
* attention in the near future and this will be fixed.
*/
private long loadJob(MJob job) {
// starting by pretending we have a brand new job
resetPersistenceId(job);
MConnector mFromConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getFromConnectorName());
MConnector mToConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getToConnectorName());
MFromConfig fromConfig = job.getFromJobConfig();
MToConfig toConfig = job.getToJobConfig();
ConnectorConfigurableUpgrader fromConnectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mFromConnector.getUniqueName()).getConfigurableUpgrader(null);
ConnectorConfigurableUpgrader toConnectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mToConnector.getUniqueName()).getConfigurableUpgrader(null);
fromConnectorConfigUpgrader.upgradeFromJobConfig(job.getFromJobConfig(), fromConfig);
toConnectorConfigUpgrader.upgradeToJobConfig(job.getToJobConfig(), toConfig);
DriverUpgrader driverConfigUpgrader = Driver.getInstance().getConfigurableUpgrader(null);
MDriver driver = Driver.getInstance().getDriver();
MDriverConfig driverConfigs = driver.getDriverConfig();
driverConfigUpgrader.upgradeJobConfig( job.getDriverConfig(), driverConfigs);
MJob newJob = new MJob(job, fromConfig, toConfig, driverConfigs);
// Transform config structures to objects for validations
SqoopConnector fromConnector =
ConnectorManager.getInstance().getSqoopConnector(
job.getFromConnectorName());
SqoopConnector toConnector =
ConnectorManager.getInstance().getSqoopConnector(
job.getToConnectorName());
Object fromConnectorConfig = ClassUtils.instantiate(
fromConnector.getJobConfigurationClass(Direction.FROM));
Object toConnectorConfig = ClassUtils.instantiate(
toConnector.getJobConfigurationClass(Direction.TO));
Object driverConfig = ClassUtils.instantiate(
Driver.getInstance().getDriverJobConfigurationClass());
ConfigUtils.fromConfigs(
job.getFromJobConfig().getConfigs(), fromConnectorConfig);
ConfigUtils.fromConfigs(
job.getToJobConfig().getConfigs(), toConnectorConfig);
ConfigUtils.fromConfigs(
job.getDriverConfig().getConfigs(), driverConfig);
ConfigValidationRunner validationRunner = new ConfigValidationRunner();
ConfigValidationResult fromConnectorConfigResult = validationRunner
.validate(fromConnectorConfig);
ConfigValidationResult toConnectorConfigResult = validationRunner.validate(toConnectorConfig);
ConfigValidationResult driverConfigResult = validationRunner.validate(driverConfig);
Status finalStatus = Status.getWorstStatus(fromConnectorConfigResult.getStatus(),
toConnectorConfigResult.getStatus(), driverConfigResult.getStatus());
if (finalStatus.canProceed()) {
RepositoryManager.getInstance().getRepository().createJob(newJob);
} else {
LOG.error("Failed to load job:" + job.getName());
LOG.error("Status of from connector configs:"
+ fromConnectorConfigResult.getStatus().toString());
LOG.error("Status of to connector configs:" + toConnectorConfigResult.getStatus().toString());
LOG.error("Status of driver configs:" + driverConfigResult.getStatus().toString());
}
return newJob.getPersistenceId();
}
private JSONArray removeObjectIfConnectorNotExist(JSONArray jsonArray, String connectorNameKey, boolean isLink) {
Repository repository = RepositoryManager.getInstance().getRepository();
List<MConnector> connectors = repository.findConnectors();
List<String> connectorNames = new ArrayList<String>();
for (MConnector connector : connectors) {
connectorNames.add(connector.getUniqueName());
}
for (Iterator iterator = jsonArray.iterator(); iterator.hasNext(); ) {
JSONObject object = (JSONObject) iterator.next();
String connectorName = (String) object.get(connectorNameKey);
String objectName = (String)object.get(JSONConstants.NAME);
if (!connectorNames.contains(connectorName)) {
// If a connector doesn't exist, remove the links and jobs relating to it
iterator.remove();
LOG.warn((isLink ? "Link " : "Job ") + objectName + " won't be loaded because connector "
+ connectorName + " is missing.");
continue;
}
}
return jsonArray;
}
private JSONArray removeJobIfLinkNotExist(JSONArray jobsJsonArray, String linkNameKey) {
Repository repository = RepositoryManager.getInstance().getRepository();
List<MLink> links = repository.findLinks();
List<String> linkNames = new ArrayList<String>();
for (MLink link : links) {
linkNames.add(link.getName());
}
for(Iterator iterator = jobsJsonArray.iterator(); iterator.hasNext(); ) {
JSONObject jobObject = (JSONObject) iterator.next();
String linkName = (String) jobObject.get(linkNameKey);
String jobName = (String)jobObject.get(JSONConstants.NAME);
if (!linkNames.contains(linkName)) {
// If a link doesn't exist, remove the jobs relating to it
iterator.remove();
LOG.warn("Job " + jobName + " won't be loaded because link " + linkName + " is missing.");
continue;
}
}
return jobsJsonArray;
}
private JSONArray removeSubmissionIfJobNotExist(JSONArray submissionsJsonArray) {
Repository repository = RepositoryManager.getInstance().getRepository();
List<MJob> jobs = repository.findJobs();
List<String> jobNames = new ArrayList<String>();
for (MJob job : jobs) {
jobNames.add(job.getName());
}
for(Iterator iterator = submissionsJsonArray.iterator(); iterator.hasNext(); ) {
JSONObject submissionObject = (JSONObject) iterator.next();
String jobName = (String) submissionObject.get(JSONConstants.JOB_NAME);
if (!jobNames.contains(jobName)) {
// If a job doesn't exist, remove the submissions relating to it
iterator.remove();
LOG.warn("Submission for " + jobName + " won't be loaded because job " + jobName + " is missing.");
continue;
}
}
return submissionsJsonArray;
}
public void setInTest(boolean isInTest) {
this.isInTest = isInTest;
}
}