SQOOP-1574: Sqoop2: From/To: Rebase against Sqoop2 branch
diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/JSONConstants.java b/tools/src/main/java/org/apache/sqoop/tools/tool/JSONConstants.java
index 288cba3..9bec0d0 100644
--- a/tools/src/main/java/org/apache/sqoop/tools/tool/JSONConstants.java
+++ b/tools/src/main/java/org/apache/sqoop/tools/tool/JSONConstants.java
@@ -28,9 +28,9 @@
public static final String CONNECTOR_NAME = "connector-name";
public static final String ALL = "all";
public static final String NAME = "name";
- public static final String CONNECTION_ID = "connection-id";
+ public static final String LINK_ID = "link-id";
public static final String JOB_ID = "job-id";
- public static final String CONNECTIONS = "connections";
+ public static final String LINKS = "links";
public static final String JOBS = "jobs";
public static final String SUBMISSIONS = "submissions";
public static final String METADATA = "metadata";
diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java
index d41b0d2..f89c546 100644
--- a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java
+++ b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java
@@ -25,25 +25,21 @@
import org.apache.commons.cli.ParseException;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.ConnectorManager;
-import org.apache.sqoop.json.ConnectionBean;
import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.LinkBean;
import org.apache.sqoop.json.SubmissionBean;
-import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.tools.ConfiguredTool;
import org.apache.sqoop.common.VersionInfo;
-import static org.apache.sqoop.json.util.FormSerialization.ALL;
+import static org.apache.sqoop.json.util.ConfigSerialization.ALL;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
/**
* Write user-created content of Sqoop repository to JSON formatted file
@@ -106,8 +102,8 @@
JSONObject result = new JSONObject();
LOG.info("Dumping Connections with skipSensitive=" + String.valueOf(skipSensitive));
- ConnectionBean connections = new ConnectionBean(repository.findConnections());
- result.put(JSONConstants.CONNECTIONS, addConnectorName(connections.extract(skipSensitive)));
+ LinkBean links = new LinkBean(repository.findLinks());
+ result.put(JSONConstants.LINKS, addConnectorName(links.extract(skipSensitive)));
LOG.info("Dumping Jobs with skipSensitive=" + String.valueOf(skipSensitive));
JobBean jobs = new JobBean(repository.findJobs());
@@ -134,7 +130,6 @@
}
private JSONObject addConnectorName(JSONObject json) {
- Repository repository = RepositoryManager.getInstance().getRepository();
ConnectorManager connectorManager = ConnectorManager.getInstance();
JSONArray results = (JSONArray) json.get(ALL);
@@ -144,7 +139,7 @@
while (iterator.hasNext()) {
JSONObject result = iterator.next();
Long connectorId = (Long) result.get(JSONConstants.CONNECTOR_ID);
- result.put(JSONConstants.CONNECTOR_NAME, connectorManager.getConnectorMetadata(connectorId).getUniqueName());
+ result.put(JSONConstants.CONNECTOR_NAME, connectorManager.getConnectorConfig(connectorId).getUniqueName());
}
return json;
diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java
index c6124da..76ebd3b 100644
--- a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java
+++ b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java
@@ -26,24 +26,27 @@
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.Charsets;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.ConnectorManager;
-import org.apache.sqoop.connector.spi.MetadataUpgrader;
+import org.apache.sqoop.connector.spi.RepositoryUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.framework.FrameworkManager;
-import org.apache.sqoop.json.ConnectionBean;
+import org.apache.sqoop.driver.Driver;
import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.LinkBean;
import org.apache.sqoop.json.SubmissionBean;
-import org.apache.sqoop.model.FormUtils;
-import org.apache.sqoop.model.MConnection;
-import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.ConfigUtils;
+import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MConnector;
-import org.apache.sqoop.model.MForm;
-import org.apache.sqoop.model.MFramework;
+import org.apache.sqoop.model.MDriver;
+import org.apache.sqoop.model.MDriverConfig;
+import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MJob;
-import org.apache.sqoop.model.MJobForms;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MPersistableEntity;
import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.tools.ConfiguredTool;
@@ -58,9 +61,9 @@
import org.apache.commons.io.IOUtils;
import org.apache.sqoop.utils.ClassUtils;
+import org.apache.sqoop.validation.ConfigValidationResult;
+import org.apache.sqoop.validation.ConfigValidationRunner;
import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.Validation;
-import org.apache.sqoop.validation.Validator;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -139,28 +142,25 @@
ConnectorManager.getInstance().initialize();
ConnectorManager connectorManager = ConnectorManager.getInstance();
- FrameworkManager.getInstance().initialize();
- FrameworkManager frameworkManager = FrameworkManager.getInstance();
-
LOG.info("Loading Connections");
- JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.CONNECTIONS);
+ JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.LINKS);
if (jsonConns == null) {
- LOG.error("Malformed JSON file. Key "+ JSONConstants.CONNECTIONS + " not found.");
+ LOG.error("Malformed JSON file. Key "+ JSONConstants.LINKS + " not found.");
return false;
}
- ConnectionBean connectionBean = new ConnectionBean();
- connectionBean.restore(updateConnectorIDUsingName(jsonConns));
+ LinkBean linkBean = new LinkBean();
+ linkBean.restore(updateConnectorIDUsingName(jsonConns));
HashMap<Long,Long> connectionIds = new HashMap<Long, Long>();
- for (MConnection connection : connectionBean.getConnections()) {
- long oldId = connection.getPersistenceId();
- long newId = loadConnection(connection);
- if (newId == connection.PERSISTANCE_ID_DEFAULT) {
- LOG.error("loading connection " + connection.getName() + " with previous ID " + oldId + " failed. Aborting repository load. Check log for details.");
+ for (MLink link : linkBean.getLinks()) {
+ long oldId = link.getPersistenceId();
+ long newId = loadLink(link);
+ if (newId == link.PERSISTANCE_ID_DEFAULT) {
+ LOG.error("loading connection " + link.getName() + " with previous ID " + oldId + " failed. Aborting repository load. Check log for details.");
return false;
}
connectionIds.put(oldId,newId);
@@ -176,7 +176,7 @@
}
JobBean jobBean = new JobBean();
- jobBean.restore(updateIdUsingMap(updateConnectorIDUsingName(jsonJobs), connectionIds,JSONConstants.CONNECTION_ID));
+ jobBean.restore(updateIdUsingMap(updateConnectorIDUsingName(jsonJobs), connectionIds,JSONConstants.LINK_ID));
HashMap<Long,Long> jobIds = new HashMap<Long, Long>();
for (MJob job: jobBean.getJobs()) {
@@ -242,117 +242,103 @@
return true;
}
- private long loadConnection(MConnection connection) {
+ private long loadLink(MLink link) {
//starting by pretending we have a brand new connection
- resetPersistenceId(connection);
+ resetPersistenceId(link);
- MetadataUpgrader upgrader = FrameworkManager.getInstance().getMetadataUpgrader();
- MFramework framework = FrameworkManager.getInstance().getFramework();
+ RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader();
Repository repository = RepositoryManager.getInstance().getRepository();
- List<MForm> frameworkForms = framework.getConnectionForms().clone(false).getForms();
- MConnectionForms newConnectionFrameworkForms = new MConnectionForms(frameworkForms);
-
- MConnector mConnector = ConnectorManager.getInstance().getConnectorMetadata(connection.getConnectorId());
- List<MForm> connectorForms = mConnector.getConnectionForms().clone(false).getForms();
- MConnectionForms newConnectionConnectorForms = new MConnectionForms(connectorForms);
+ MConnector mConnector = ConnectorManager.getInstance().getConnectorConfig(link.getConnectorId());
+ List<MConfig> connectorConfigs = mConnector.getLinkConfig().clone(false).getConfigs();
+ MLinkConfig newLinkConfigs = new MLinkConfig(connectorConfigs);
// upgrading the forms to make sure they match the current repository
- upgrader.upgrade(connection.getFrameworkPart(), newConnectionFrameworkForms);
- upgrader.upgrade(connection.getConnectorPart(), newConnectionConnectorForms);
- MConnection newConnection = new MConnection(connection, newConnectionConnectorForms, newConnectionFrameworkForms);
+ upgrader.upgrade(link.getConnectorLinkConfig(), newLinkConfigs);
+ MLink newLink = new MLink(link, newLinkConfigs);
- // Transform form structures to objects for validations
+ // Transform config structures to objects for validations
SqoopConnector connector =
- ConnectorManager.getInstance().getConnector(connection.getConnectorId());
+ ConnectorManager.getInstance().getConnector(link.getConnectorId());
Object connectorConfig = ClassUtils.instantiate(
- connector.getConnectionConfigurationClass());
- Object frameworkConfig = ClassUtils.instantiate(
- FrameworkManager.getInstance().getConnectionConfigurationClass());
+ connector.getLinkConfigurationClass());
- FormUtils.fromForms(
- connection.getConnectorPart().getForms(), connectorConfig);
- FormUtils.fromForms(
- connection.getFrameworkPart().getForms(), frameworkConfig);
+ ConfigUtils.fromConfigs(
+ link.getConnectorLinkConfig().getConfigs(), connectorConfig);
- Validator connectorValidator = connector.getValidator();
- Validator frameworkValidator = FrameworkManager.getInstance().getValidator();
+ ConfigValidationRunner validationRunner = new ConfigValidationRunner();
+ ConfigValidationResult result = validationRunner.validate(connectorConfig);
- Validation connectorValidation =
- connectorValidator.validateConnection(connectorConfig);
- Validation frameworkValidation =
- frameworkValidator.validateConnection(frameworkConfig);
-
- Status finalStatus = Status.getWorstStatus(connectorValidation.getStatus(),
- frameworkValidation.getStatus());
+ Status finalStatus = Status.getWorstStatus(result.getStatus());
if (finalStatus.canProceed()) {
- repository.createConnection(newConnection);
+ repository.createLink(newLink);
} else {
- LOG.error("Failed to load connection:" + connection.getName());
- LOG.error("Status of connector forms:" + connectorValidation.getStatus().toString());
- LOG.error("Status of framework forms:" + frameworkValidation.getStatus().toString());
+ LOG.error("Failed to load link:" + link.getName());
+ LOG.error("Status of connector configs:" + result.getStatus().toString());
}
- return newConnection.getPersistenceId();
+ return newLink.getPersistenceId();
}
private long loadJob(MJob job) {
//starting by pretending we have a brand new job
resetPersistenceId(job);
-
- MetadataUpgrader upgrader = FrameworkManager.getInstance().getMetadataUpgrader();
- MFramework framework = FrameworkManager.getInstance().getFramework();
+ RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader();
+ MDriver driver = Driver.getInstance().getDriver();
Repository repository = RepositoryManager.getInstance().getRepository();
- MJob.Type jobType = job.getType();
- List<MForm> frameworkForms = framework.getJobForms(job.getType()).clone(false).getForms();
- MJobForms newJobFrameworkForms = new MJobForms(jobType,frameworkForms);
+ MDriverConfig driverConfigs = driver.getDriverConfig();
+ MFromConfig fromConfigs = job.getFromJobConfig();
+ MToConfig toConfigs = job.getToJobConfig();
- MConnector mConnector = ConnectorManager.getInstance().getConnectorMetadata(job.getConnectorId());
- List<MForm> connectorForms = mConnector.getJobForms(jobType).clone(false).getForms();
- MJobForms newJobConnectorForms = new MJobForms(jobType,connectorForms);
+ // upgrading the configs to make sure they match the current repository
+ upgrader.upgrade(job.getDriverConfig(), driverConfigs);
+ upgrader.upgrade(job.getFromJobConfig(), fromConfigs);
+ upgrader.upgrade(job.getToJobConfig(), toConfigs);
+ MJob newJob = new MJob(job, fromConfigs, toConfigs, driverConfigs);
- // upgrading the forms to make sure they match the current repository
- upgrader.upgrade(job.getFrameworkPart(), newJobFrameworkForms);
- upgrader.upgrade(job.getConnectorPart(), newJobConnectorForms);
- MJob newJob = new MJob(job, newJobConnectorForms, newJobFrameworkForms);
+ // Transform config structures to objects for validations
+ SqoopConnector fromConnector =
+ ConnectorManager.getInstance().getConnector(
+ job.getConnectorId(Direction.FROM));
+ SqoopConnector toConnector =
+ ConnectorManager.getInstance().getConnector(
+ job.getConnectorId(Direction.TO));
- // Transform form structures to objects for validations
- SqoopConnector connector =
- ConnectorManager.getInstance().getConnector(job.getConnectorId());
+ Object fromConnectorConfig = ClassUtils.instantiate(
+ fromConnector.getJobConfigurationClass(Direction.FROM));
+ Object toConnectorConfig = ClassUtils.instantiate(
+ toConnector.getJobConfigurationClass(Direction.TO));
+ Object driverConfig = ClassUtils.instantiate(
+ Driver.getInstance().getDriverConfigurationGroupClass());
- Object connectorConfig = ClassUtils.instantiate(
- connector.getJobConfigurationClass(jobType));
- Object frameworkConfig = ClassUtils.instantiate(
- FrameworkManager.getInstance().getJobConfigurationClass(jobType));
+ ConfigUtils.fromConfigs(
+ job.getFromJobConfig().getConfigs(), fromConnectorConfig);
+ ConfigUtils.fromConfigs(
+ job.getToJobConfig().getConfigs(), toConnectorConfig);
+ ConfigUtils.fromConfigs(
+ job.getDriverConfig().getConfigs(), driverConfig);
- FormUtils.fromForms(
- job.getConnectorPart().getForms(), connectorConfig);
- FormUtils.fromForms(
- job.getFrameworkPart().getForms(), frameworkConfig);
+ ConfigValidationRunner validationRunner = new ConfigValidationRunner();
+ ConfigValidationResult fromConnectorConfigResult = validationRunner.validate(fromConnectorConfig);
+ ConfigValidationResult toConnectorConfigResult = validationRunner.validate(toConnectorConfig);
+ ConfigValidationResult driverConfigResult = validationRunner.validate(driverConfig);
- Validator connectorValidator = connector.getValidator();
- Validator frameworkValidator = FrameworkManager.getInstance().getValidator();
-
- Validation connectorValidation =
- connectorValidator.validateJob(jobType,connectorConfig);
- Validation frameworkValidation =
- frameworkValidator.validateJob(jobType,frameworkConfig);
-
- Status finalStatus = Status.getWorstStatus(connectorValidation.getStatus(),
- frameworkValidation.getStatus());
+ Status finalStatus = Status.getWorstStatus(fromConnectorConfigResult.getStatus(),
+ toConnectorConfigResult.getStatus(), driverConfigResult.getStatus());
if (finalStatus.canProceed()) {
repository.createJob(newJob);
} else {
LOG.error("Failed to load job:" + job.getName());
- LOG.error("Status of connector forms:" + connectorValidation.getStatus().toString());
- LOG.error("Status of framework forms:" + frameworkValidation.getStatus().toString());
+ LOG.error("Status of from connector configs:" + fromConnectorConfigResult.getStatus().toString());
+ LOG.error("Status of to connector configs:" + toConnectorConfigResult.getStatus().toString());
+ LOG.error("Status of driver configs:" + driverConfigResult.getStatus().toString());
}
return newJob.getPersistenceId();