blob: dcc10f1f9918abc1b367ae3124cc86975f1d04bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.mapreduce;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRef;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
/**
* Base tool for running MapReduce-based ingests of data.
*/
public abstract class AbstractBulkLoadTool extends Configured implements Tool {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractBulkLoadTool.class);
static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Supply zookeeper connection details (optional)");
static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input path(s) (comma-separated, mandatory)");
static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)");
static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)");
static final Option TABLE_NAME_OPT = new Option("t", "table", true, "Phoenix table name (mandatory)");
static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", true, "Phoenix index table name when just loading this particualar index table");
static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported");
static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors");
static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit");
static final Option SKIP_HEADER_OPT = new Option("k", "skip-header", false, "Skip the first line of CSV files (the header)");
/**
* Set configuration values based on parsed command line options.
*
* @param cmdLine supplied command line options
* @param importColumns descriptors of columns to be imported
* @param conf job configuration
*/
protected abstract void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
Configuration conf) throws SQLException;
protected abstract void setupJob(Job job);
protected Options getOptions() {
Options options = new Options();
options.addOption(INPUT_PATH_OPT);
options.addOption(TABLE_NAME_OPT);
options.addOption(INDEX_TABLE_NAME_OPT);
options.addOption(ZK_QUORUM_OPT);
options.addOption(OUTPUT_PATH_OPT);
options.addOption(SCHEMA_NAME_OPT);
options.addOption(IMPORT_COLUMNS_OPT);
options.addOption(IGNORE_ERRORS_OPT);
options.addOption(HELP_OPT);
options.addOption(SKIP_HEADER_OPT);
return options;
}
/**
* Parses the commandline arguments, throws IllegalStateException if mandatory arguments are
* missing.
*
* @param args supplied command line arguments
* @return the parsed command line
*/
protected CommandLine parseOptions(String[] args) {
Options options = getOptions();
CommandLineParser parser = new PosixParser();
CommandLine cmdLine = null;
try {
cmdLine = parser.parse(options, args);
} catch (ParseException e) {
printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
}
if (cmdLine.hasOption(HELP_OPT.getOpt())) {
printHelpAndExit(options, 0);
}
if (!cmdLine.hasOption(TABLE_NAME_OPT.getOpt())) {
throw new IllegalStateException(TABLE_NAME_OPT.getLongOpt() + " is a mandatory " +
"parameter");
}
if (!cmdLine.getArgList().isEmpty()) {
throw new IllegalStateException("Got unexpected extra parameters: "
+ cmdLine.getArgList());
}
if (!cmdLine.hasOption(INPUT_PATH_OPT.getOpt())) {
throw new IllegalStateException(INPUT_PATH_OPT.getLongOpt() + " is a mandatory " +
"parameter");
}
return cmdLine;
}
private void printHelpAndExit(String errorMessage, Options options) {
System.err.println(errorMessage);
printHelpAndExit(options, 1);
}
private void printHelpAndExit(Options options, int exitCode) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("help", options);
System.exit(exitCode);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create(getConf());
CommandLine cmdLine = null;
try {
cmdLine = parseOptions(args);
} catch (IllegalStateException e) {
printHelpAndExit(e.getMessage(), getOptions());
}
return loadData(conf, cmdLine);
}
/**
* Check schema or table name that start with two double quotes i.e ""t"" -> true
*/
private boolean isStartWithTwoDoubleQuotes (String name) {
boolean start = false;
boolean end = false;
if (name != null && name.length() > 1) {
int length = name.length();
start = name.substring(0,2).equals("\"\"");
end = name.substring(length-2, length).equals("\"\"");
if (start && !end) {
throw new IllegalArgumentException("Invalid table/schema name " + name +
". Please check if name end with two double quotes.");
}
}
return start;
}
private int loadData(Configuration conf, CommandLine cmdLine) throws Exception {
String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
boolean quotedTableName = isStartWithTwoDoubleQuotes(tableName);
if (quotedTableName) {
// Commons-cli cannot parse full quoted argument i.e "t" (CLI-275).
// if \"\"t\"\" passed, then both pairs of quoted are left intact as ""t"".
// So remove one pair of quote from tablename ""t"" -> "t".
tableName = tableName.substring(1, tableName.length() - 1);
}
boolean quotedSchemaName = isStartWithTwoDoubleQuotes(schemaName);
if (quotedSchemaName) {
schemaName = schemaName.substring(1,schemaName.length() - 1);
}
String qualifiedTableName = SchemaUtil.getQualifiedTableName(schemaName, tableName);
String qualifiedIndexTableName = null;
if (indexTableName != null){
qualifiedIndexTableName = SchemaUtil.getQualifiedTableName(schemaName, indexTableName);
}
if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
// ZK_QUORUM_OPT is optional, but if it's there, use it for both the conn and the job.
String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
PhoenixDriver.ConnectionInfo info = PhoenixDriver.ConnectionInfo.create(zkQuorum);
LOG.info("Configuring HBase connection to {}", info);
for (Map.Entry<String,String> entry : info.asProps()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting {} = {}", entry.getKey(), entry.getValue());
}
conf.set(entry.getKey(), entry.getValue());
}
}
// Skip the first line of the CSV file(s)?
if (cmdLine.hasOption(SKIP_HEADER_OPT.getOpt())) {
PhoenixTextInputFormat.setSkipHeader(conf);
}
final Connection conn = QueryUtil.getConnection(conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Reading columns from {} :: {}", ((PhoenixConnection) conn).getURL(),
qualifiedTableName);
}
List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName);
Preconditions.checkNotNull(importColumns);
Preconditions.checkArgument(!importColumns.isEmpty(), "Column info list is empty");
FormatToBytesWritableMapper.configureColumnInfoList(conf, importColumns);
boolean ignoreInvalidRows = cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt());
conf.setBoolean(FormatToBytesWritableMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows);
conf.set(FormatToBytesWritableMapper.TABLE_NAME_CONFKEY,
SchemaUtil.getEscapedFullTableName(qualifiedTableName));
// give subclasses their hook
configureOptions(cmdLine, importColumns, conf);
String sName = SchemaUtil.normalizeIdentifier(schemaName);
String tName = SchemaUtil.normalizeIdentifier(tableName);
try {
validateTable(conn, sName, tName);
} finally {
conn.close();
}
final String inputPaths = cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt());
final Path outputPath;
if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) {
outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt()));
} else {
outputPath = new Path("/tmp/" + UUID.randomUUID());
}
List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>();
PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName, table.getPhysicalName().getString()));
boolean hasLocalIndexes = false;
for(PTable index: table.getIndexes()) {
if (index.getIndexType() == IndexType.LOCAL) {
hasLocalIndexes =
qualifiedIndexTableName == null ? true : index.getTableName().getString()
.equals(qualifiedIndexTableName);
if (hasLocalIndexes) break;
}
}
// using conn after it's been closed... o.O
tablesToBeLoaded.addAll(getIndexTables(conn, qualifiedTableName));
// When loading a single index table, check index table name is correct
if (qualifiedIndexTableName != null){
TargetTableRef targetIndexRef = null;
for (TargetTableRef tmpTable : tablesToBeLoaded){
if (tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) == 0) {
targetIndexRef = tmpTable;
break;
}
}
if (targetIndexRef == null){
throw new IllegalStateException("Bulk Loader error: index table " +
qualifiedIndexTableName + " doesn't exist");
}
tablesToBeLoaded.clear();
tablesToBeLoaded.add(targetIndexRef);
}
return submitJob(conf, tableName, inputPaths, outputPath, tablesToBeLoaded, hasLocalIndexes);
}
/**
* Submits the jobs to the cluster.
* Loads the HFiles onto the respective tables.
* @throws Exception
*/
public int submitJob(final Configuration conf, final String qualifiedTableName,
final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded, boolean hasLocalIndexes) throws Exception {
Job job = Job.getInstance(conf, "Phoenix MapReduce import for " + qualifiedTableName);
FileInputFormat.addInputPaths(job, inputPaths);
FileOutputFormat.setOutputPath(job, outputPath);
job.setInputFormatClass(PhoenixTextInputFormat.class);
job.setMapOutputKeyClass(TableRowkeyPair.class);
job.setMapOutputValueClass(ImmutableBytesWritable.class);
job.setOutputKeyClass(TableRowkeyPair.class);
job.setOutputValueClass(KeyValue.class);
job.setReducerClass(FormatToKeyValueReducer.class);
byte[][] splitKeysBeforeJob = null;
HTable table = null;
if(hasLocalIndexes) {
try{
table = new HTable(job.getConfiguration(), qualifiedTableName);
splitKeysBeforeJob = table.getRegionLocator().getStartKeys();
} finally {
if(table != null )table.close();
}
}
MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON.apply(tablesToBeLoaded);
job.getConfiguration().set(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
job.getConfiguration().set(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
// give subclasses their hook
setupJob(job);
LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
boolean success = job.waitForCompletion(true);
if (success) {
if (hasLocalIndexes) {
try {
table = new HTable(job.getConfiguration(), qualifiedTableName);
if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, table.getRegionLocator().getStartKeys())) {
LOG.error("The table "
+ qualifiedTableName
+ " has local indexes and there is split key mismatch before and"
+ " after running bulkload job. Please rerun the job otherwise"
+ " there may be inconsistencies between actual data and index data.");
return -1;
}
} finally {
if (table != null) table.close();
}
}
LOG.info("Loading HFiles from {}", outputPath);
completebulkload(conf,outputPath,tablesToBeLoaded);
LOG.info("Removing output directory {}", outputPath);
if(!outputPath.getFileSystem(conf).delete(outputPath, true)) {
LOG.error("Failed to delete the output directory {}", outputPath);
}
return 0;
} else {
return -1;
}
}
private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {
Set<String> tableNames = new HashSet<>(tablesToBeLoaded.size());
for(TargetTableRef table : tablesToBeLoaded) {
if(tableNames.contains(table.getPhysicalName())){
continue;
}
tableNames.add(table.getPhysicalName());
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
String tableName = table.getPhysicalName();
Path tableOutputPath = CsvBulkImportUtil.getOutputPath(outputPath, tableName);
try(HTable htable = new HTable(conf,tableName)) {
LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
loader.doBulkLoad(tableOutputPath, htable);
LOG.info("Incremental load complete for table=" + tableName);
}
}
}
/**
* Build up the list of columns to be imported. The list is taken from the command line if
* present, otherwise it is taken from the table description.
*
* @param conn connection to Phoenix
* @param cmdLine supplied command line options
* @param qualifiedTableName table name (possibly with schema) of the table to be imported
* @return the list of columns to be imported
*/
List<ColumnInfo> buildImportColumns(Connection conn, CommandLine cmdLine,
String qualifiedTableName) throws SQLException {
List<String> userSuppliedColumnNames = null;
if (cmdLine.hasOption(IMPORT_COLUMNS_OPT.getOpt())) {
userSuppliedColumnNames = Lists.newArrayList(
Splitter.on(",").trimResults().split
(cmdLine.getOptionValue(IMPORT_COLUMNS_OPT.getOpt())));
}
return SchemaUtil.generateColumnInfo(
conn, qualifiedTableName, userSuppliedColumnNames, true);
}
/**
* Perform any required validation on the table being bulk loaded into:
* - ensure no column family names start with '_', as they'd be ignored leading to problems.
* @throws java.sql.SQLException
*/
private void validateTable(Connection conn, String schemaName,
String tableName) throws SQLException {
ResultSet rs = conn.getMetaData().getColumns(
null, StringUtil.escapeLike(schemaName),
StringUtil.escapeLike(tableName), null);
while (rs.next()) {
String familyName = rs.getString(PhoenixDatabaseMetaData.COLUMN_FAMILY);
if (familyName != null && familyName.startsWith("_")) {
if (QueryConstants.DEFAULT_COLUMN_FAMILY.equals(familyName)) {
throw new IllegalStateException(
"Bulk Loader error: All column names that are not part of the " +
"primary key constraint must be prefixed with a column family " +
"name (i.e. f.my_column VARCHAR)");
} else {
throw new IllegalStateException("Bulk Loader error: Column family name " +
"must not start with '_': " + familyName);
}
}
}
rs.close();
}
/**
* Get the index tables of current data table
* @throws java.sql.SQLException
*/
private List<TargetTableRef> getIndexTables(Connection conn, String qualifiedTableName)
throws SQLException {
PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
for(PTable indexTable : table.getIndexes()){
indexTables.add(new TargetTableRef(indexTable.getName().getString(), indexTable
.getPhysicalName().getString()));
}
return indexTables;
}
}