| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.mapreduce.index; |
| |
| import java.io.IOException; |
| import java.sql.Connection; |
| import java.sql.SQLException; |
| import java.util.List; |
| |
| import com.google.common.base.Strings; |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.CommandLineParser; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Option; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.cli.ParseException; |
| import org.apache.commons.cli.PosixParser; |
| import org.apache.commons.lang.exception.ExceptionUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.HBaseAdmin; |
| import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; |
| import org.apache.hadoop.hbase.util.FSUtils; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; |
| import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.mapreduce.CsvBulkImportUtil; |
| import org.apache.phoenix.mapreduce.util.ConnectionUtil; |
| import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; |
| import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; |
| import org.apache.phoenix.parse.HintNode.Hint; |
| import org.apache.phoenix.schema.PTable; |
| import org.apache.phoenix.util.EnvironmentEdgeManager; |
| import org.apache.phoenix.util.PhoenixRuntime; |
| import org.apache.phoenix.util.QueryUtil; |
| import org.apache.phoenix.util.SchemaUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| |
| /** |
| * An MR job to verify that the index table is in sync with the data table. |
| * |
| */ |
| public class IndexScrutinyTool extends Configured implements Tool { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(IndexScrutinyTool.class); |
| |
| private static final Option SCHEMA_NAME_OPTION = |
| new Option("s", "schema", true, "Phoenix schema name (optional)"); |
| private static final Option DATA_TABLE_OPTION = |
| new Option("dt", "data-table", true, "Data table name (mandatory)"); |
| private static final Option INDEX_TABLE_OPTION = |
| new Option("it", "index-table", true, |
| "Index table name (mandatory)."); |
| private static final Option TIMESTAMP = |
| new Option("t", "time", true, |
| "Timestamp in millis used to compare the index and data tables. Defaults to current time minus 60 seconds"); |
| |
| private static final Option RUN_FOREGROUND_OPTION = |
| new Option("runfg", "run-foreground", false, "Applicable on top of -direct option." |
| + "If specified, runs index scrutiny in Foreground. Default - Runs the build in background."); |
| |
| private static final Option SNAPSHOT_OPTION = //TODO check if this works |
| new Option("snap", "snapshot", false, |
| "If specified, uses Snapshots for async index building (optional)"); |
| |
| public static final Option BATCH_SIZE_OPTION = |
| new Option("b", "batch-size", true, "Number of rows to compare at a time"); |
| public static final Option SOURCE_TABLE_OPTION = |
| new Option("src", "source", true, |
| "Table to use as the source table, whose rows are iterated over and compared to the other table." |
| + " Options are DATA_TABLE_SOURCE, INDEX_TABLE_SOURCE, BOTH." |
| + " Defaults to BOTH, which does two separate jobs to iterate over both tables"); |
| |
| private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); |
| |
| private static final Option OUTPUT_INVALID_ROWS_OPTION = |
| new Option("o", "output", false, "Whether to output invalid rows"); |
| private static final Option OUTPUT_FORMAT_OPTION = |
| new Option("of", "output-format", true, |
| "Format in which to output invalid rows. Options are FILE, TABLE. Defaults to TABLE"); |
| private static final Option OUTPUT_PATH_OPTION = |
| new Option("op", "output-path", true, "Output path where the files are written"); |
| private static final Option OUTPUT_MAX = new Option("om", "output-max", true, "Max number of invalid rows to output per mapper. Defaults to 1M"); |
| private static final Option TENANT_ID_OPTION = new Option("tenant", "tenant-id", true, |
| "If specified, uses Tenant connection for tenant view index scrutiny (optional)"); |
| public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_SCRUTINY_[%s]_[%s]"; |
| |
| /** |
| * Which table to use as the source table |
| */ |
| public static enum SourceTable { |
| DATA_TABLE_SOURCE, INDEX_TABLE_SOURCE, |
| /** |
| * Runs two separate jobs to iterate over both tables |
| */ |
| BOTH; |
| } |
| |
| public static enum OutputFormat { |
| FILE, TABLE |
| } |
| |
| private List<Job> jobs = Lists.newArrayList(); |
| |
| private Options getOptions() { |
| final Options options = new Options(); |
| options.addOption(SCHEMA_NAME_OPTION); |
| options.addOption(DATA_TABLE_OPTION); |
| options.addOption(INDEX_TABLE_OPTION); |
| options.addOption(RUN_FOREGROUND_OPTION); |
| options.addOption(OUTPUT_INVALID_ROWS_OPTION); |
| options.addOption(OUTPUT_FORMAT_OPTION); |
| options.addOption(OUTPUT_PATH_OPTION); |
| options.addOption(OUTPUT_MAX); |
| options.addOption(SNAPSHOT_OPTION); |
| options.addOption(HELP_OPTION); |
| options.addOption(TIMESTAMP); |
| options.addOption(BATCH_SIZE_OPTION); |
| options.addOption(SOURCE_TABLE_OPTION); |
| options.addOption(TENANT_ID_OPTION); |
| return options; |
| } |
| |
| /** |
| * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are |
| * missing. |
| * @param args supplied command line arguments |
| * @return the parsed command line |
| */ |
| private CommandLine parseOptions(String[] args) { |
| final Options options = getOptions(); |
| |
| CommandLineParser parser = new PosixParser(); |
| CommandLine cmdLine = null; |
| try { |
| cmdLine = parser.parse(options, args); |
| } catch (ParseException e) { |
| printHelpAndExit("Error parsing command line options: " + e.getMessage(), options); |
| } |
| |
| if (cmdLine.hasOption(HELP_OPTION.getOpt())) { |
| printHelpAndExit(options, 0); |
| } |
| |
| requireOption(cmdLine, DATA_TABLE_OPTION); |
| requireOption(cmdLine, INDEX_TABLE_OPTION); |
| |
| return cmdLine; |
| } |
| |
| private void requireOption(CommandLine cmdLine, Option option) { |
| if (!cmdLine.hasOption(option.getOpt())) { |
| throw new IllegalStateException(option.getLongOpt() + " is a mandatory parameter"); |
| } |
| } |
| |
| private void printHelpAndExit(String errorMessage, Options options) { |
| System.err.println(errorMessage); |
| printHelpAndExit(options, 1); |
| } |
| |
| private void printHelpAndExit(Options options, int exitCode) { |
| HelpFormatter formatter = new HelpFormatter(); |
| formatter.printHelp("help", options); |
| System.exit(exitCode); |
| } |
| |
| class JobFactory { |
| Connection connection; |
| Configuration configuration; |
| private boolean useSnapshot; |
| private long ts; |
| private boolean outputInvalidRows; |
| private OutputFormat outputFormat; |
| private String basePath; |
| private long scrutinyExecuteTime; |
| private long outputMaxRows; // per mapper |
| private String tenantId; |
| |
| public JobFactory(Connection connection, Configuration configuration, long batchSize, |
| boolean useSnapshot, long ts, boolean outputInvalidRows, OutputFormat outputFormat, |
| String basePath, long outputMaxRows, String tenantId) { |
| this.outputInvalidRows = outputInvalidRows; |
| this.outputFormat = outputFormat; |
| this.basePath = basePath; |
| this.outputMaxRows = outputMaxRows; |
| PhoenixConfigurationUtil.setScrutinyBatchSize(configuration, batchSize); |
| this.connection = connection; |
| this.configuration = configuration; |
| this.useSnapshot = useSnapshot; |
| this.tenantId = tenantId; |
| this.ts = ts; // CURRENT_SCN to set |
| scrutinyExecuteTime = EnvironmentEdgeManager.currentTimeMillis(); // time at which scrutiny was run. |
| // Same for |
| // all jobs created from this factory |
| PhoenixConfigurationUtil.setScrutinyExecuteTimestamp(configuration, |
| scrutinyExecuteTime); |
| if (!Strings.isNullOrEmpty(tenantId)) { |
| PhoenixConfigurationUtil.setTenantId(configuration, tenantId); |
| } |
| } |
| |
| public Job createSubmittableJob(String schemaName, String indexTable, String dataTable, |
| SourceTable sourceTable) throws Exception { |
| Preconditions.checkArgument(SourceTable.DATA_TABLE_SOURCE.equals(sourceTable) |
| || SourceTable.INDEX_TABLE_SOURCE.equals(sourceTable)); |
| |
| final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); |
| final String qIndexTable; |
| if (schemaName != null && !schemaName.isEmpty()) { |
| qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable); |
| } else { |
| qIndexTable = indexTable; |
| } |
| PhoenixConfigurationUtil.setScrutinyDataTable(configuration, qDataTable); |
| PhoenixConfigurationUtil.setScrutinyIndexTable(configuration, qIndexTable); |
| PhoenixConfigurationUtil.setScrutinySourceTable(configuration, sourceTable); |
| PhoenixConfigurationUtil.setScrutinyOutputInvalidRows(configuration, outputInvalidRows); |
| PhoenixConfigurationUtil.setScrutinyOutputMax(configuration, outputMaxRows); |
| |
| final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); |
| final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable); |
| |
| // set CURRENT_SCN for our scan so that incoming writes don't throw off scrutiny |
| configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, Long.toString(ts)); |
| |
| // set the source table to either data or index table |
| SourceTargetColumnNames columnNames = |
| SourceTable.DATA_TABLE_SOURCE.equals(sourceTable) |
| ? new SourceTargetColumnNames.DataSourceColNames(pdataTable, |
| pindexTable) |
| : new SourceTargetColumnNames.IndexSourceColNames(pdataTable, |
| pindexTable); |
| String qSourceTable = columnNames.getQualifiedSourceTableName(); |
| List<String> sourceColumnNames = columnNames.getSourceColNames(); |
| List<String> sourceDynamicCols = columnNames.getSourceDynamicCols(); |
| List<String> targetDynamicCols = columnNames.getTargetDynamicCols(); |
| |
| // Setup the select query against source - we either select the index columns from the |
| // index table, |
| // or select the data table equivalents of the index columns from the data table |
| final String selectQuery = |
| QueryUtil.constructSelectStatement(qSourceTable, sourceColumnNames, null, |
| Hint.NO_INDEX, true); |
| LOGGER.info("Query used on source table to feed the mapper: " + selectQuery); |
| |
| PhoenixConfigurationUtil.setScrutinyOutputFormat(configuration, outputFormat); |
| // if outputting to table, setup the upsert to the output table |
| if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) { |
| String upsertStmt = |
| IndexScrutinyTableOutput.constructOutputTableUpsert(sourceDynamicCols, |
| targetDynamicCols, connection); |
| PhoenixConfigurationUtil.setUpsertStatement(configuration, upsertStmt); |
| LOGGER.info("Upsert statement used for output table: " + upsertStmt); |
| } |
| |
| final String jobName = |
| String.format(INDEX_JOB_NAME_TEMPLATE, qSourceTable, |
| columnNames.getQualifiedTargetTableName()); |
| final Job job = Job.getInstance(configuration, jobName); |
| |
| if (!useSnapshot) { |
| PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, |
| selectQuery); |
| } else { // TODO check if using a snapshot works |
| HBaseAdmin admin = null; |
| String snapshotName; |
| try { |
| final PhoenixConnection pConnection = |
| connection.unwrap(PhoenixConnection.class); |
| admin = pConnection.getQueryServices().getAdmin(); |
| String pdataTableName = pdataTable.getName().getString(); |
| snapshotName = new StringBuilder(pdataTableName).append("-Snapshot").toString(); |
| admin.snapshot(snapshotName, TableName.valueOf(pdataTableName)); |
| } finally { |
| if (admin != null) { |
| admin.close(); |
| } |
| } |
| // root dir not a subdirectory of hbase dir |
| Path rootDir = new Path("hdfs:///index-snapshot-dir"); |
| FSUtils.setRootDir(configuration, rootDir); |
| Path restoreDir = new Path(FSUtils.getRootDir(configuration), "restore-dir"); |
| |
| // set input for map reduce job using hbase snapshots |
| //PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, snapshotName, |
| // qDataTable, restoreDir, selectQuery); |
| } |
| TableMapReduceUtil.initCredentials(job); |
| Path outputPath = |
| getOutputPath(configuration, basePath, |
| SourceTable.DATA_TABLE_SOURCE.equals(sourceTable) ? pdataTable |
| : pindexTable); |
| |
| return configureSubmittableJob(job, outputPath); |
| } |
| |
| private Job configureSubmittableJob(Job job, Path outputPath) throws Exception { |
| Configuration conf = job.getConfiguration(); |
| conf.setBoolean("mapreduce.job.user.classpath.first", true); |
| HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); |
| job.setJarByClass(IndexScrutinyTool.class); |
| job.setOutputFormatClass(NullOutputFormat.class); |
| if (outputInvalidRows && OutputFormat.FILE.equals(outputFormat)) { |
| job.setOutputFormatClass(TextOutputFormat.class); |
| FileOutputFormat.setOutputPath(job, outputPath); |
| } |
| job.setMapperClass(IndexScrutinyMapper.class); |
| job.setNumReduceTasks(0); |
| // Set the Output classes |
| job.setMapOutputKeyClass(Text.class); |
| job.setMapOutputValueClass(Text.class); |
| TableMapReduceUtil.addDependencyJars(job); |
| return job; |
| } |
| |
| Path getOutputPath(final Configuration configuration, String basePath, PTable table) |
| throws IOException { |
| Path outputPath = null; |
| FileSystem fs; |
| if (basePath != null) { |
| outputPath = |
| CsvBulkImportUtil.getOutputPath(new Path(basePath), |
| table.getPhysicalName().getString()); |
| fs = outputPath.getFileSystem(configuration); |
| fs.delete(outputPath, true); |
| } |
| return outputPath; |
| } |
| } |
| |
| @Override |
| public int run(String[] args) throws Exception { |
| Connection connection = null; |
| try { |
| /** start - parse command line configs **/ |
| CommandLine cmdLine = null; |
| try { |
| cmdLine = parseOptions(args); |
| } catch (IllegalStateException e) { |
| printHelpAndExit(e.getMessage(), getOptions()); |
| } |
| final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf()); |
| boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt()); |
| String tenantId = null; |
| if (useTenantId) { |
| tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt()); |
| configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); |
| LOGGER.info(String.format("IndexScrutinyTool uses a tenantId %s", tenantId)); |
| } |
| connection = ConnectionUtil.getInputConnection(configuration); |
| final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); |
| final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); |
| String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); |
| final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); |
| String basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); |
| boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); |
| boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt()); |
| boolean outputInvalidRows = cmdLine.hasOption(OUTPUT_INVALID_ROWS_OPTION.getOpt()); |
| SourceTable sourceTable = |
| cmdLine.hasOption(SOURCE_TABLE_OPTION.getOpt()) |
| ? SourceTable |
| .valueOf(cmdLine.getOptionValue(SOURCE_TABLE_OPTION.getOpt())) |
| : SourceTable.BOTH; |
| |
| long batchSize = |
| cmdLine.hasOption(BATCH_SIZE_OPTION.getOpt()) |
| ? Long.parseLong(cmdLine.getOptionValue(BATCH_SIZE_OPTION.getOpt())) |
| : PhoenixConfigurationUtil.DEFAULT_SCRUTINY_BATCH_SIZE; |
| |
| long ts = |
| cmdLine.hasOption(TIMESTAMP.getOpt()) |
| ? Long.parseLong(cmdLine.getOptionValue(TIMESTAMP.getOpt())) |
| : EnvironmentEdgeManager.currentTimeMillis() - 60000; |
| |
| if (indexTable != null) { |
| if (!IndexTool.isValidIndexTable(connection, qDataTable, indexTable, tenantId)) { |
| throw new IllegalArgumentException(String |
| .format(" %s is not an index table for %s ", indexTable, qDataTable)); |
| } |
| } |
| |
| String outputFormatOption = cmdLine.getOptionValue(OUTPUT_FORMAT_OPTION.getOpt()); |
| OutputFormat outputFormat = |
| outputFormatOption != null |
| ? OutputFormat.valueOf(outputFormatOption.toUpperCase()) |
| : OutputFormat.TABLE; |
| long outputMaxRows = |
| cmdLine.hasOption(OUTPUT_MAX.getOpt()) |
| ? Long.parseLong(cmdLine.getOptionValue(OUTPUT_MAX.getOpt())) |
| : 1000000L; |
| /** end - parse command line configs **/ |
| |
| if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) { |
| // create the output table if it doesn't exist |
| Configuration outputConfiguration = HBaseConfiguration.create(configuration); |
| outputConfiguration.unset(PhoenixRuntime.TENANT_ID_ATTRIB); |
| try (Connection outputConn = ConnectionUtil.getOutputConnection(outputConfiguration)) { |
| outputConn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_TABLE_DDL); |
| outputConn.createStatement() |
| .execute(IndexScrutinyTableOutput.OUTPUT_METADATA_DDL); |
| } |
| } |
| |
| LOGGER.info(String.format( |
| "Running scrutiny [schemaName=%s, dataTable=%s, indexTable=%s, useSnapshot=%s, timestamp=%s, batchSize=%s, outputBasePath=%s, outputFormat=%s, outputMaxRows=%s]", |
| schemaName, dataTable, indexTable, useSnapshot, ts, batchSize, basePath, |
| outputFormat, outputMaxRows)); |
| JobFactory jobFactory = |
| new JobFactory(connection, configuration, batchSize, useSnapshot, ts, |
| outputInvalidRows, outputFormat, basePath, outputMaxRows, tenantId); |
| // If we are running the scrutiny with both tables as the source, run two separate jobs, |
| // one for each direction |
| if (SourceTable.BOTH.equals(sourceTable)) { |
| jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable, |
| SourceTable.DATA_TABLE_SOURCE)); |
| jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable, |
| SourceTable.INDEX_TABLE_SOURCE)); |
| } else { |
| jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable, |
| sourceTable)); |
| } |
| |
| if (!isForeground) { |
| LOGGER.info("Running Index Scrutiny in Background - Submit async and exit"); |
| for (Job job : jobs) { |
| job.submit(); |
| } |
| return 0; |
| } |
| LOGGER.info( |
| "Running Index Scrutiny in Foreground. Waits for the build to complete. This may take a long time!."); |
| boolean result = true; |
| for (Job job : jobs) { |
| result = result && job.waitForCompletion(true); |
| } |
| |
| // write the results to the output metadata table |
| if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) { |
| LOGGER.info("Writing results of jobs to output table " |
| + IndexScrutinyTableOutput.OUTPUT_METADATA_TABLE_NAME); |
| IndexScrutinyTableOutput.writeJobResults(connection, args, jobs); |
| } |
| |
| if (result) { |
| return 0; |
| } else { |
| LOGGER.error("IndexScrutinyTool job failed! Check logs for errors.."); |
| return -1; |
| } |
| } catch (Exception ex) { |
| LOGGER.error("An exception occurred while performing the indexing job: " |
| + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex)); |
| return -1; |
| } finally { |
| try { |
| if (connection != null) { |
| connection.close(); |
| } |
| } catch (SQLException sqle) { |
| LOGGER.error("Failed to close connection ", sqle.getMessage()); |
| throw new RuntimeException("Failed to close connection"); |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| public List<Job> getJobs() { |
| return jobs; |
| } |
| |
| public static void main(final String[] args) throws Exception { |
| int result = ToolRunner.run(new IndexScrutinyTool(), args); |
| System.exit(result); |
| } |
| |
| } |