blob: 5c951627f8b715d4b10ef1daf08c2c96144f8f0a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.oozie;
import com.google.common.base.Splitter;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.DatasourceHelper;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.datasource.Datasource;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.fs.Path;
import javax.xml.bind.JAXBElement;
/**
* Builds Datasource import workflow for Oozie.
*/
public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
protected static final String IMPORT_SQOOP_ACTION_TEMPLATE = "/action/feed/import-sqoop-database-action.xml";
protected static final String IMPORT_ACTION_NAME="db-import-sqoop";
public DatabaseImportWorkflowBuilder(Feed entity) { super(entity); }
@Override
protected WorkflowExecutionContext.EntityOperations getOperation() {
return WorkflowExecutionContext.EntityOperations.IMPORT;
}
@Override
protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath)
throws FalconException {
ACTION action = unmarshalAction(IMPORT_SQOOP_ACTION_TEMPLATE);
JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> actionJaxbElement = OozieUtils.unMarshalSqoopAction(action);
org.apache.falcon.oozie.sqoop.ACTION sqoopImport = actionJaxbElement.getValue();
Properties props = new Properties();
ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath, sqoopImport);
OozieUtils.marshalSqoopAction(action, actionJaxbElement);
addPostProcessing(workflow, action);
decorateWorkflow(workflow, workflow.getName(), IMPORT_ACTION_NAME);
addLibExtensionsToWorkflow(cluster, workflow, Tag.IMPORT);
// build the sqoop command and put it in the properties
String sqoopCmd = buildSqoopCommand(cluster, entity);
LOG.info("SQOOP IMPORT COMMAND : " + sqoopCmd);
props.put("sqoopCommand", sqoopCmd);
return props;
}
private String buildSqoopCommand(Cluster cluster, Feed feed) throws FalconException {
Map<String, String> extraArgs = getArguments(cluster);
StringBuilder sqoopArgs = new StringBuilder();
StringBuilder sqoopOptions = new StringBuilder();
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
Datasource datasource = DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(feedCluster));
ImportExportCommon.buildConnectionManagerArg(sqoopArgs, datasource);
ImportExportCommon.buildDriverArgs(sqoopArgs, datasource).append(ImportExportCommon.ARG_SEPARATOR);
ImportExportCommon.buildConnectArg(sqoopArgs, DatasourceHelper.getReadOnlyEndpoint(datasource))
.append(ImportExportCommon.ARG_SEPARATOR);
ImportExportCommon.buildTableArg(sqoopArgs, FeedHelper.getImportDataSourceTableName(feedCluster))
.append(ImportExportCommon.ARG_SEPARATOR);
ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions,
DatasourceHelper.getReadPasswordInfo(datasource)).append(ImportExportCommon.ARG_SEPARATOR);
buildNumMappers(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR);
buildArguments(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR);
buildTargetArg(sqoopArgs, feed, cluster).append(ImportExportCommon.ARG_SEPARATOR);
StringBuilder sqoopCmd = new StringBuilder();
return sqoopCmd.append("import").append(ImportExportCommon.ARG_SEPARATOR)
.append(sqoopOptions).append(ImportExportCommon.ARG_SEPARATOR)
.append(sqoopArgs).toString();
}
private StringBuilder buildTargetArg(StringBuilder builder, Feed feed, Cluster cluster)
throws FalconException {
Storage.TYPE feedStorageType = FeedHelper.getStorageType(feed, cluster);
if (feedStorageType == Storage.TYPE.TABLE) {
return buildTargetTableArg(builder, feed.getTable());
} else {
return buildTargetDirArg(builder);
}
}
private StringBuilder buildTargetDirArg(StringBuilder builder)
throws FalconException {
return builder.append("--delete-target-dir").append(ImportExportCommon.ARG_SEPARATOR)
.append("--target-dir").append(ImportExportCommon.ARG_SEPARATOR)
.append(String.format("${coord:dataOut('%s')}",
FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME));
}
private StringBuilder buildArguments(StringBuilder builder, Map<String, String> extraArgs)
throws FalconException {
for(Map.Entry<String, String> e : extraArgs.entrySet()) {
builder.append(e.getKey()).append(ImportExportCommon.ARG_SEPARATOR).append(e.getValue())
.append(ImportExportCommon.ARG_SEPARATOR);
}
return builder;
}
/**
*
* Feed validation checks to make sure --split-by column is supplied when --num-mappers > 1
* if --num-mappers is not specified, set it to 1.
*
* @param builder contains command
* @param extraArgs map of extra arguments
* @return command string
*/
private StringBuilder buildNumMappers(StringBuilder builder, Map<String, String> extraArgs) {
if (!extraArgs.containsKey("--num-mappers")) {
builder.append("--num-mappers").append(ImportExportCommon.ARG_SEPARATOR).append(1);
}
return builder;
}
private Map<String, String> getArguments(Cluster cluster) throws FalconException {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
return FeedHelper.getImportArguments(feedCluster);
}
private StringBuilder buildTargetTableArg(StringBuilder builder, CatalogTable catalog) throws FalconException {
LOG.info("Catalog URI {}", catalog.getUri());
builder.append("--skip-dist-cache").append(ImportExportCommon.ARG_SEPARATOR);
Iterator<String> itr = Splitter.on("#").split(catalog.getUri()).iterator();
String dbTable = itr.next();
String partitions = itr.next();
Iterator<String> itrDbTable = Splitter.on(":").split(dbTable).iterator();
itrDbTable.next();
String db = itrDbTable.next();
String table = itrDbTable.next();
LOG.debug("Target database {}, table {}", db, table);
builder.append("--hcatalog-database").append(ImportExportCommon.ARG_SEPARATOR)
.append(String.format("${coord:databaseOut('%s')}", FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME))
.append(ImportExportCommon.ARG_SEPARATOR);
builder.append("--hcatalog-table").append(ImportExportCommon.ARG_SEPARATOR)
.append(String.format("${coord:tableOut('%s')}", FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME))
.append(ImportExportCommon.ARG_SEPARATOR);
Map<String, String> partitionsMap = ImportExportCommon.getPartitionKeyValues(partitions);
if (partitionsMap.size() > 0) {
StringBuilder partitionKeys = new StringBuilder();
StringBuilder partitionValues = new StringBuilder();
for (Map.Entry<String, String> e : partitionsMap.entrySet()) {
partitionKeys.append(e.getKey());
partitionKeys.append(',');
partitionValues.append(String.format("${coord:dataOutPartitionValue('%s','%s')}",
FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME,
e.getKey()));
partitionValues.append(',');
}
if (partitionsMap.size() > 0) {
partitionKeys.setLength(partitionKeys.length()-1);
partitionValues.setLength(partitionValues.length()-1);
}
LOG.debug("partitionKeys {} and partitionValue {}", partitionKeys.toString(), partitionValues.toString());
builder.append("--hcatalog-partition-keys").append(ImportExportCommon.ARG_SEPARATOR)
.append(partitionKeys.toString()).append(ImportExportCommon.ARG_SEPARATOR);
builder.append("--hcatalog-partition-values").append(ImportExportCommon.ARG_SEPARATOR)
.append(partitionValues.toString()).append(ImportExportCommon.ARG_SEPARATOR);
}
return builder;
}
}