| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.mapreduce.index; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Strings; |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.CommandLineParser; |
| import org.apache.commons.cli.DefaultParser; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Option; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.cli.ParseException; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| import org.apache.phoenix.hbase.index.IndexRegionObserver; |
| import org.apache.phoenix.hbase.index.Indexer; |
| import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; |
| import org.apache.phoenix.index.GlobalIndexChecker; |
| import org.apache.phoenix.index.PhoenixIndexBuilder; |
| import org.apache.phoenix.index.PhoenixIndexCodec; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.mapreduce.util.ConnectionUtil; |
| import org.apache.phoenix.query.ConnectionQueryServices; |
| |
| import org.apache.phoenix.query.QueryServicesOptions; |
| import org.apache.phoenix.schema.PIndexState; |
| import org.apache.phoenix.schema.PTable; |
| import org.apache.phoenix.schema.PTableType; |
| import org.apache.phoenix.util.EnvironmentEdgeManager; |
| import org.apache.phoenix.util.MetaDataUtil; |
| import org.apache.phoenix.util.PhoenixRuntime; |
| |
| import java.sql.ResultSet; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.logging.Logger; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.phoenix.util.SchemaUtil; |
| |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.sql.Connection; |
| import java.sql.SQLException; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.logging.FileHandler; |
| import java.util.logging.SimpleFormatter; |
| |
| import static org.apache.phoenix.query.QueryServicesOptions. |
| GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN; |
| |
| public class IndexUpgradeTool extends Configured implements Tool { |
| |
| private static final Logger LOGGER = Logger.getLogger(IndexUpgradeTool.class.getName()); |
| |
| private static final Option OPERATION_OPTION = new Option("o", "operation", |
| true, |
| "[Required] Operation to perform (upgrade/rollback)"); |
| private static final Option TABLE_OPTION = new Option("tb", "table", true, |
| "[Required] Tables list ex. table1,table2"); |
| private static final Option TABLE_CSV_FILE_OPTION = new Option("f", "file", |
| true, |
| "[Optional] Tables list in a csv file"); |
| private static final Option DRY_RUN_OPTION = new Option("d", "dry-run", |
| false, |
| "[Optional] If passed this will output steps that will be executed"); |
| private static final Option HELP_OPTION = new Option("h", "help", |
| false, "Help"); |
| private static final Option LOG_FILE_OPTION = new Option("lf", "logfile", |
| true, |
| "[Optional] Log file path where the logs are written"); |
| private static final Option INDEX_SYNC_REBUILD_OPTION = new Option("sr", |
| "index-sync-rebuild", |
| false, |
| "[Optional] Whether or not synchronously rebuild the indexes; " |
| + "default rebuild asynchronous"); |
| |
| private static final Option INDEX_VERIFY_OPTION = new Option("v", |
| "verify", |
| true, |
| "[Optional] mode to run indexTool with verify options"); |
| |
| public static final String UPGRADE_OP = "upgrade"; |
| public static final String ROLLBACK_OP = "rollback"; |
| private static final String GLOBAL_INDEX_ID = "#NA#"; |
| private IndexTool indexingTool; |
| |
| private HashMap<String, HashSet<String>> tablesAndIndexes = new HashMap<>(); |
| private HashMap<String, HashMap<String,IndexInfo>> rebuildMap = new HashMap<>(); |
| private HashMap<String, String> prop = new HashMap<>(); |
| private HashMap<String, String> emptyProp = new HashMap<>(); |
| |
| private boolean dryRun, upgrade, syncRebuild; |
| private String operation; |
| private String inputTables; |
| private String logFile; |
| private String inputFile; |
| private boolean isWaitComplete = false; |
| private String verify; |
| |
| private boolean test = false; |
| |
| public void setDryRun(boolean dryRun) { |
| this.dryRun = dryRun; |
| } |
| |
| public void setInputTables(String inputTables) { |
| this.inputTables = inputTables; |
| } |
| |
| public void setLogFile(String logFile) { |
| this.logFile = logFile; |
| } |
| |
| public void setInputFile(String inputFile) { |
| this.inputFile = inputFile; |
| } |
| |
| public void setTest(boolean test) { this.test = test; } |
| |
| public boolean getIsWaitComplete() { return this.isWaitComplete; } |
| |
| |
| public boolean getDryRun() { return this.dryRun; } |
| |
| public String getVerify() { return this.verify; } |
| |
| public String getInputTables() { |
| return this.inputTables; |
| } |
| |
| public String getLogFile() { |
| return this.logFile; |
| } |
| |
| public String getOperation() { |
| return this.operation; |
| } |
| |
| public IndexUpgradeTool(String mode, String tables, String inputFile, |
| String outputFile, boolean dryRun, IndexTool indexTool) { |
| this.operation = mode; |
| this.inputTables = tables; |
| this.inputFile = inputFile; |
| this.logFile = outputFile; |
| this.dryRun = dryRun; |
| this.indexingTool = indexTool; |
| } |
| |
| public IndexUpgradeTool () { } |
| |
| @Override |
| public int run(String[] args) throws Exception { |
| CommandLine cmdLine = null; |
| try { |
| cmdLine = parseOptions(args); |
| LOGGER.info("Index Upgrade tool initiated: " + String.join(",", args)); |
| } catch (IllegalStateException e) { |
| printHelpAndExit(e.getMessage(), getOptions()); |
| } |
| initializeTool(cmdLine); |
| prepareToolSetup(); |
| executeTool(); |
| return 0; |
| } |
| |
| /** |
| * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are |
| * missing. |
| * @param args supplied command line arguments |
| * @return the parsed command line |
| */ |
| @VisibleForTesting |
| public CommandLine parseOptions(String[] args) { |
| |
| final Options options = getOptions(); |
| |
| CommandLineParser parser = new DefaultParser(); |
| CommandLine cmdLine = null; |
| try { |
| cmdLine = parser.parse(options, args); |
| } catch (ParseException e) { |
| printHelpAndExit("severe parsing command line options: " + e.getMessage(), |
| options); |
| } |
| if (cmdLine.hasOption(HELP_OPTION.getOpt())) { |
| printHelpAndExit(options, 0); |
| } |
| if (!cmdLine.hasOption(OPERATION_OPTION.getOpt())) { |
| throw new IllegalStateException(OPERATION_OPTION.getLongOpt() |
| +" is a mandatory parameter"); |
| } |
| if (cmdLine.hasOption(DRY_RUN_OPTION.getOpt()) |
| && !cmdLine.hasOption(LOG_FILE_OPTION.getOpt())) { |
| throw new IllegalStateException("Log file with "+TABLE_OPTION.getLongOpt() |
| + " is mandatory if " + DRY_RUN_OPTION.getLongOpt() +" is passed"); |
| } |
| if (!(cmdLine.hasOption(TABLE_OPTION.getOpt())) |
| && !(cmdLine.hasOption(TABLE_CSV_FILE_OPTION.getOpt()))) { |
| throw new IllegalStateException("Tables list should be passed in either with" |
| +TABLE_OPTION.getLongOpt() + " or " + TABLE_CSV_FILE_OPTION.getLongOpt()); |
| } |
| if ((cmdLine.hasOption(TABLE_OPTION.getOpt())) |
| && (cmdLine.hasOption(TABLE_CSV_FILE_OPTION.getOpt()))) { |
| throw new IllegalStateException("Tables list passed in with" |
| +TABLE_OPTION.getLongOpt() + " and " + TABLE_CSV_FILE_OPTION.getLongOpt() |
| + "; specify only one."); |
| } |
| return cmdLine; |
| } |
| |
| private void printHelpAndExit(String severeMessage, Options options) { |
| System.err.println(severeMessage); |
| printHelpAndExit(options, 1); |
| } |
| |
| private void printHelpAndExit(Options options, int exitCode) { |
| HelpFormatter formatter = new HelpFormatter(); |
| formatter.printHelp("help", options); |
| System.exit(exitCode); |
| } |
| |
| private Options getOptions() { |
| final Options options = new Options(); |
| options.addOption(OPERATION_OPTION); |
| TABLE_OPTION.setOptionalArg(true); |
| options.addOption(TABLE_OPTION); |
| TABLE_CSV_FILE_OPTION.setOptionalArg(true); |
| options.addOption(TABLE_CSV_FILE_OPTION); |
| DRY_RUN_OPTION.setOptionalArg(true); |
| options.addOption(DRY_RUN_OPTION); |
| LOG_FILE_OPTION.setOptionalArg(true); |
| options.addOption(LOG_FILE_OPTION); |
| options.addOption(HELP_OPTION); |
| INDEX_SYNC_REBUILD_OPTION.setOptionalArg(true); |
| options.addOption(INDEX_SYNC_REBUILD_OPTION); |
| INDEX_VERIFY_OPTION.setOptionalArg(true); |
| options.addOption(INDEX_VERIFY_OPTION); |
| return options; |
| } |
| |
| @VisibleForTesting |
| public void initializeTool(CommandLine cmdLine) { |
| operation = cmdLine.getOptionValue(OPERATION_OPTION.getOpt()); |
| inputTables = cmdLine.getOptionValue(TABLE_OPTION.getOpt()); |
| logFile = cmdLine.getOptionValue(LOG_FILE_OPTION.getOpt()); |
| inputFile = cmdLine.getOptionValue(TABLE_CSV_FILE_OPTION.getOpt()); |
| dryRun = cmdLine.hasOption(DRY_RUN_OPTION.getOpt()); |
| syncRebuild = cmdLine.hasOption(INDEX_SYNC_REBUILD_OPTION.getOpt()); |
| verify = cmdLine.getOptionValue(INDEX_VERIFY_OPTION.getOpt()); |
| } |
| |
| @VisibleForTesting |
| public void prepareToolSetup() { |
| try { |
| if (logFile != null) { |
| FileHandler fh = new FileHandler(logFile); |
| fh.setFormatter(new SimpleFormatter()); |
| LOGGER.addHandler(fh); |
| } |
| |
| prop.put(Indexer.INDEX_BUILDER_CONF_KEY, PhoenixIndexBuilder.class.getName()); |
| prop.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); |
| |
| if (inputTables == null) { |
| inputTables = new String(Files.readAllBytes(Paths.get(inputFile))); |
| } |
| if (inputTables == null) { |
| LOGGER.severe("Tables' list is not available; use -tb or -f option"); |
| } |
| LOGGER.info("list of tables passed: " + inputTables); |
| |
| if (operation.equalsIgnoreCase(UPGRADE_OP)) { |
| upgrade = true; |
| } else if (operation.equalsIgnoreCase(ROLLBACK_OP)) { |
| upgrade = false; |
| } else { |
| throw new IllegalStateException("Invalid option provided for " |
| + OPERATION_OPTION.getOpt() + " expected values: {upgrade, rollback}"); |
| } |
| if (dryRun) { |
| LOGGER.info("This is the beginning of the tool with dry run."); |
| } |
| } catch (IOException e) { |
| LOGGER.severe("Something went wrong "+e); |
| System.exit(-1); |
| } |
| } |
| |
| @VisibleForTesting |
| public int executeTool() { |
| Configuration conf = HBaseConfiguration.addHbaseResources(getConf()); |
| |
| try (Connection conn = ConnectionUtil.getInputConnection(conf)) { |
| |
| ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class) |
| .getQueryServices(); |
| |
| boolean status = extractTablesAndIndexes(conn.unwrap(PhoenixConnection.class)); |
| |
| if (status) { |
| return executeTool(conn, queryServices, conf); |
| } |
| } catch (SQLException e) { |
| LOGGER.severe("Something went wrong in executing tool "+ e); |
| } |
| return -1; |
| } |
| |
| private int executeTool(Connection conn, |
| ConnectionQueryServices queryServices, |
| Configuration conf) { |
| ArrayList<String> immutableList = new ArrayList<>(); |
| ArrayList<String> mutableList = new ArrayList<>(); |
| for (Map.Entry<String, HashSet<String>> entry :tablesAndIndexes.entrySet()) { |
| String dataTableFullName = entry.getKey(); |
| try { |
| PTable dataTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName); |
| if (dataTable.isImmutableRows()) { |
| //add to list where immutable tables are processed in a different function |
| immutableList.add(dataTableFullName); |
| } else { |
| mutableList.add(dataTableFullName); |
| } |
| } catch (SQLException e) { |
| LOGGER.severe("Something went wrong while getting the PTable " |
| + dataTableFullName + " "+e); |
| return -1; |
| } |
| } |
| long startWaitTime = executeToolForImmutableTables(queryServices, immutableList); |
| executeToolForMutableTables(conn, queryServices, conf, mutableList); |
| enableImmutableTables(queryServices, immutableList, startWaitTime); |
| rebuildIndexes(conn, conf, immutableList); |
| return 0; |
| } |
| |
| private long executeToolForImmutableTables(ConnectionQueryServices queryServices, |
| ArrayList<String> immutableList) { |
| LOGGER.info("Started " + operation + " for immutable tables"); |
| for (String dataTableFullName : immutableList) { |
| try (Admin admin = queryServices.getAdmin()) { |
| HashSet<String> indexes = tablesAndIndexes.get(dataTableFullName); |
| LOGGER.info("Executing " + operation + " of " + dataTableFullName |
| + " (immutable)"); |
| disableTable(admin, dataTableFullName, indexes); |
| modifyTable(admin, dataTableFullName, indexes); |
| } catch (IOException | SQLException e) { |
| LOGGER.severe("Something went wrong while disabling " |
| + "or modifying immutable table " + e); |
| handleFailure(queryServices, dataTableFullName, immutableList); |
| } |
| } |
| long startWaitTime = EnvironmentEdgeManager.currentTimeMillis(); |
| return startWaitTime; |
| } |
| |
| private void executeToolForMutableTables(Connection conn, |
| ConnectionQueryServices queryServices, |
| Configuration conf, |
| ArrayList<String> mutableTables) { |
| LOGGER.info("Started " + operation + " for mutable tables"); |
| for (String dataTableFullName : mutableTables) { |
| try (Admin admin = queryServices.getAdmin()) { |
| HashSet<String> indexes = tablesAndIndexes.get(dataTableFullName); |
| LOGGER.info("Executing " + operation + " of " + dataTableFullName); |
| disableTable(admin, dataTableFullName, indexes); |
| modifyTable(admin, dataTableFullName, indexes); |
| enableTable(admin, dataTableFullName, indexes); |
| LOGGER.info("Completed " + operation + " of " + dataTableFullName); |
| } catch (IOException | SQLException e) { |
| LOGGER.severe("Something went wrong while executing " |
| + operation + " steps for "+ dataTableFullName + " " + e); |
| handleFailure(queryServices, dataTableFullName, mutableTables); |
| } |
| } |
| // Opportunistically kick-off index rebuilds after upgrade operation |
| rebuildIndexes(conn, conf, mutableTables); |
| } |
| |
| private void handleFailure(ConnectionQueryServices queryServices, |
| String dataTableFullName, |
| ArrayList<String> tableList) { |
| LOGGER.info("Performing error handling to revert the steps taken during " + operation); |
| HashSet<String> indexes = tablesAndIndexes.get(dataTableFullName); |
| try (Admin admin = queryServices.getAdmin()) { |
| upgrade = !upgrade; |
| disableTable(admin, dataTableFullName, indexes); |
| modifyTable(admin, dataTableFullName, indexes); |
| enableTable(admin, dataTableFullName, indexes); |
| upgrade = !upgrade; |
| |
| tablesAndIndexes.remove(dataTableFullName); //removing from the map |
| tableList.remove(dataTableFullName); //removing from the list |
| |
| LOGGER.severe(dataTableFullName+" has been removed from the list as tool failed" |
| + " to perform "+operation); |
| } catch (IOException | SQLException e) { |
| LOGGER.severe("Revert of the "+operation +" failed in error handling, " |
| + "throwing runtime exception"); |
| LOGGER.severe("Confirm the state for "+getSubListString(tableList, dataTableFullName)); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private void enableImmutableTables(ConnectionQueryServices queryServices, |
| ArrayList<String> immutableList, |
| long startWaitTime) { |
| |
| while(true) { |
| long waitMore = getWaitMoreTime(startWaitTime); |
| if (waitMore <= 0) { |
| isWaitComplete = true; |
| break; |
| } |
| try { |
| // If the table is immutable, we need to wait for clients to purge |
| // their caches of table metadata |
| Thread.sleep(waitMore); |
| isWaitComplete = true; |
| } catch(InterruptedException e) { |
| LOGGER.warning("Sleep before starting index rebuild is interrupted. " |
| + "Attempting to sleep again! " + e.getMessage()); |
| } |
| } |
| |
| for (String dataTableFullName: immutableList) { |
| try (Admin admin = queryServices.getAdmin()) { |
| HashSet<String> indexes = tablesAndIndexes.get(dataTableFullName); |
| enableTable(admin, dataTableFullName, indexes); |
| } catch (IOException | SQLException e) { |
| LOGGER.severe("Something went wrong while enabling immutable table " + e); |
| //removing to avoid any rebuilds after upgrade |
| tablesAndIndexes.remove(dataTableFullName); |
| immutableList.remove(dataTableFullName); |
| throw new RuntimeException("Manually enable the following tables " |
| + getSubListString(immutableList, dataTableFullName) |
| + " and run the index rebuild ", e); |
| } |
| } |
| } |
| |
| private String getSubListString(ArrayList<String> tableList, String dataTableFullName) { |
| return StringUtils.join(",", tableList.subList(tableList.indexOf(dataTableFullName), |
| tableList.size())); |
| } |
| |
| private long getWaitMoreTime(long startWaitTime) { |
| int waitTime = GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN+1; |
| long endWaitTime = EnvironmentEdgeManager.currentTimeMillis(); |
| if(test || dryRun) { |
| return 0; //no wait |
| } |
| return (((waitTime) * 60000) - Math.abs(endWaitTime-startWaitTime)); |
| } |
| |
| private void modifyTable(Admin admin, String dataTableFullName, HashSet<String> indexes) |
| throws IOException { |
| if (upgrade) { |
| modifyIndexTable(admin, indexes); |
| modifyDataTable(admin, dataTableFullName); |
| } else { |
| modifyDataTable(admin, dataTableFullName); |
| modifyIndexTable(admin, indexes); |
| } |
| } |
| |
| private void disableTable(Admin admin, String dataTable, HashSet<String>indexes) |
| throws IOException { |
| if (admin.isTableEnabled(TableName.valueOf(dataTable))) { |
| if (!dryRun) { |
| admin.disableTable(TableName.valueOf(dataTable)); |
| } |
| LOGGER.info("Disabled data table " + dataTable); |
| } else { |
| LOGGER.info( "Data table " + dataTable + " is already disabled"); |
| } |
| for (String indexName : indexes) { |
| if (admin.isTableEnabled(TableName.valueOf(indexName))) { |
| if (!dryRun) { |
| admin.disableTable(TableName.valueOf(indexName)); |
| } |
| LOGGER.info("Disabled index table " + indexName); |
| } else { |
| LOGGER.info( "Index table " + indexName + " is already disabled"); |
| } |
| } |
| } |
| |
| private void enableTable(Admin admin, String dataTable, HashSet<String>indexes) |
| throws IOException { |
| if (!admin.isTableEnabled(TableName.valueOf(dataTable))) { |
| if (!dryRun) { |
| admin.enableTable(TableName.valueOf(dataTable)); |
| } |
| LOGGER.info("Enabled data table " + dataTable); |
| } else { |
| LOGGER.info( "Data table " + dataTable + " is already enabled"); |
| } |
| for (String indexName : indexes) { |
| if(!admin.isTableEnabled(TableName.valueOf(indexName))) { |
| if (!dryRun) { |
| admin.enableTable(TableName.valueOf(indexName)); |
| } |
| LOGGER.info("Enabled index table " + indexName); |
| } else { |
| LOGGER.info( "Index table " + indexName + " is already enabled"); |
| } |
| } |
| } |
| |
| private void rebuildIndexes(Connection conn, Configuration conf, ArrayList<String> tableList) { |
| if (!upgrade) { |
| return; |
| } |
| for (String table: tableList) { |
| rebuildIndexes(conn, conf, table); |
| } |
| } |
| |
| private void rebuildIndexes(Connection conn, Configuration conf, String dataTableFullName) { |
| try { |
| HashMap<String, IndexInfo> rebuildMap = prepareToRebuildIndexes(conn, dataTableFullName); |
| |
| //for rebuilding indexes in case of upgrade and if there are indexes on the table/view. |
| if (rebuildMap.isEmpty()) { |
| LOGGER.info("No indexes to rebuild for table " + dataTableFullName); |
| return; |
| } |
| if(!test) { |
| indexingTool = new IndexTool(); |
| indexingTool.setConf(conf); |
| } |
| startIndexRebuilds(conn, dataTableFullName, rebuildMap, indexingTool); |
| |
| } catch (SQLException e) { |
| LOGGER.severe("Failed to prepare the map for index rebuilds " + e); |
| throw new RuntimeException("Failed to prepare the map for index rebuilds"); |
| } |
| } |
| |
| private void modifyDataTable(Admin admin, String tableName) |
| throws IOException { |
| TableDescriptorBuilder tableDescBuilder = TableDescriptorBuilder |
| .newBuilder(admin.getDescriptor(TableName.valueOf(tableName))); |
| if (upgrade) { |
| removeCoprocessor(admin, tableName, tableDescBuilder, Indexer.class.getName()); |
| addCoprocessor(admin, tableName, tableDescBuilder, IndexRegionObserver.class.getName()); |
| } else { |
| removeCoprocessor(admin, tableName, tableDescBuilder, IndexRegionObserver.class.getName()); |
| addCoprocessor(admin, tableName, tableDescBuilder, Indexer.class.getName()); |
| } |
| if (!dryRun) { |
| admin.modifyTable(tableDescBuilder.build()); |
| } |
| } |
| |
| private void addCoprocessor(Admin admin, String tableName, TableDescriptorBuilder tableDescBuilder, |
| String coprocName) throws IOException { |
| addCoprocessor(admin, tableName, tableDescBuilder, coprocName, |
| QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop); |
| } |
| |
| private void addCoprocessor(Admin admin, String tableName, TableDescriptorBuilder tableDescBuilder, |
| String coprocName,int priority, Map<String, String> propsToAdd) throws IOException { |
| if (!admin.getDescriptor(TableName.valueOf(tableName)).hasCoprocessor(coprocName)) { |
| if (!dryRun) { |
| CoprocessorDescriptorBuilder coprocBuilder = |
| CoprocessorDescriptorBuilder.newBuilder(coprocName); |
| coprocBuilder.setPriority(priority).setProperties(propsToAdd); |
| tableDescBuilder.setCoprocessor(coprocBuilder.build()); |
| } |
| LOGGER.info("Loaded " + coprocName + " coprocessor on table " + tableName); |
| } else { |
| LOGGER.info(coprocName + " coprocessor on table " + tableName + "is already loaded"); |
| } |
| } |
| |
| private void removeCoprocessor(Admin admin, String tableName, TableDescriptorBuilder tableDescBuilder, |
| String coprocName) throws IOException { |
| if (admin.getDescriptor(TableName.valueOf(tableName)).hasCoprocessor(coprocName)) { |
| if (!dryRun) { |
| tableDescBuilder.removeCoprocessor(coprocName); |
| } |
| LOGGER.info("Unloaded "+ coprocName +"coprocessor on table " + tableName); |
| } else { |
| LOGGER.info(coprocName + " coprocessor on table " + tableName + " is already unloaded"); |
| } |
| } |
| |
| private void modifyIndexTable(Admin admin, HashSet<String> indexes) |
| throws IOException { |
| for (String indexName : indexes) { |
| TableDescriptorBuilder indexTableDescBuilder = TableDescriptorBuilder |
| .newBuilder(admin.getDescriptor(TableName.valueOf(indexName))); |
| if (upgrade) { |
| //GlobalIndexChecker needs to be a "lower" priority than all the others so that it |
| //goes first. It also doesn't get the codec props the IndexRegionObserver needs |
| addCoprocessor(admin, indexName, indexTableDescBuilder, GlobalIndexChecker.class.getName(), |
| QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY -1, emptyProp); |
| } else { |
| removeCoprocessor(admin, indexName, indexTableDescBuilder, GlobalIndexChecker.class.getName()); |
| } |
| if (!dryRun) { |
| admin.modifyTable(indexTableDescBuilder.build()); |
| } |
| } |
| } |
| |
| private int startIndexRebuilds(Connection conn, |
| String dataTable, |
| HashMap<String, IndexInfo> indexInfos, |
| IndexTool indexingTool) { |
| |
| for(Map.Entry<String, IndexInfo> entry : indexInfos.entrySet()) { |
| String index = entry.getKey(); |
| IndexInfo indexInfo = entry.getValue(); |
| String indexName = SchemaUtil.getTableNameFromFullName(index); |
| String tenantId = indexInfo.getTenantId(); |
| String baseTable = indexInfo.getBaseTable(); |
| String schema = indexInfo.getSchemaName(); |
| String outFile = "/tmp/index_rebuild_" +schema+"_"+ indexName + |
| (GLOBAL_INDEX_ID.equals(tenantId)?"":"_"+tenantId) +"_" |
| + UUID.randomUUID().toString(); |
| String[] args = getIndexToolArgValues(schema, baseTable, indexName, outFile, tenantId); |
| Connection newConnection = conn; |
| Connection tenantConnection = null; |
| try { |
| LOGGER.info("Rebuilding index: " + String.join(",", args)); |
| if (!dryRun) { |
| // If the index is in DISABLED state, indexTool will fail. First to ALTER REBUILD ASYNC. |
| // ALTER REBUILD ASYNC will set the index state to BUILDING which is safe to make ACTIVE later. |
| if (!Strings.isNullOrEmpty(tenantId) && !GLOBAL_INDEX_ID.equals(tenantId)) { |
| Configuration conf = HBaseConfiguration.addHbaseResources(getConf()); |
| conf.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); |
| newConnection = ConnectionUtil.getInputConnection(conf); |
| tenantConnection = newConnection; |
| } |
| |
| PTable indexPTable = PhoenixRuntime.getTable(newConnection, indexInfo.getPhysicalIndexTableName()); |
| if (indexPTable.getIndexState() == PIndexState.DISABLE) { |
| String dataTableFullName = dataTable; |
| if (!dataTableFullName.contains(":") && !dataTableFullName.contains(".")) { |
| dataTableFullName = SchemaUtil.getTableName(schema, dataTable); |
| } |
| String |
| stmt = |
| String.format("ALTER INDEX %s ON %s REBUILD ASYNC", indexName, |
| dataTableFullName); |
| newConnection.createStatement().execute(stmt); |
| } |
| |
| indexingTool.run(args); |
| } |
| } catch (Exception e) { |
| LOGGER.severe("Something went wrong while building the index " |
| + index + " " + e); |
| return -1; |
| } finally { |
| try { |
| // Close tenant connection |
| if (tenantConnection != null) { |
| tenantConnection.close(); |
| } |
| } catch (SQLException e) { |
| LOGGER.warning("Couldn't close tenant connection. Ignoring"); |
| } |
| } |
| } |
| return 0; |
| } |
| |
| public String[] getIndexToolArgValues(String schema, String baseTable, String indexName, |
| String outFile, String tenantId) { |
| String args[] = { "-s", schema, "-dt", baseTable, "-it", indexName, |
| "-direct", "-op", outFile }; |
| ArrayList<String> list = new ArrayList<>(Arrays.asList(args)); |
| if (!GLOBAL_INDEX_ID.equals(tenantId)) { |
| list.add("-tenant"); |
| list.add(tenantId); |
| } |
| if (syncRebuild) { |
| list.add("-runfg"); |
| } |
| if(!Strings.isNullOrEmpty(verify)) { |
| list.add("-v"); |
| list.add(verify); |
| } |
| return list.toArray(new String[list.size()]); |
| } |
| |
| private boolean extractTablesAndIndexes(PhoenixConnection conn) { |
| String [] tables = inputTables.trim().split(","); |
| PTable dataTable = null; |
| try { |
| for (String tableName : tables) { |
| HashSet<String> physicalIndexes = new HashSet<>(); |
| dataTable = PhoenixRuntime.getTableNoCache(conn, tableName); |
| String physicalTableName = dataTable.getPhysicalName().getString(); |
| if (!dataTable.isTransactional() && dataTable.getType().equals(PTableType.TABLE)) { |
| for (PTable indexTable : dataTable.getIndexes()) { |
| if (indexTable.getIndexType().equals(PTable.IndexType.GLOBAL)) { |
| String physicalIndexName = indexTable.getPhysicalName().getString(); |
| physicalIndexes.add(physicalIndexName); |
| } |
| } |
| if (MetaDataUtil.hasViewIndexTable(conn, dataTable.getPhysicalName())) { |
| String viewIndexPhysicalName = MetaDataUtil |
| .getViewIndexPhysicalName(physicalTableName); |
| physicalIndexes.add(viewIndexPhysicalName); |
| } |
| //for upgrade or rollback |
| tablesAndIndexes.put(physicalTableName, physicalIndexes); |
| } else { |
| LOGGER.info("Skipping Table " + tableName + " because it is " + |
| (dataTable.isTransactional() ? "transactional" : "not a data table")); |
| } |
| } |
| return true; |
| } catch (SQLException e) { |
| LOGGER.severe("Failed to find list of indexes "+e); |
| if (dataTable == null) { |
| LOGGER.severe("Unable to find the provided data table"); |
| } |
| return false; |
| } |
| } |
| |
| private HashMap<String, IndexInfo> prepareToRebuildIndexes(Connection conn, |
| String dataTableFullName) throws SQLException { |
| |
| HashMap<String, IndexInfo> indexInfos = new HashMap<>(); |
| HashSet<String> physicalIndexes = tablesAndIndexes.get(dataTableFullName); |
| |
| String viewIndexPhysicalName = MetaDataUtil |
| .getViewIndexPhysicalName(dataTableFullName); |
| boolean hasViewIndex = physicalIndexes.contains(viewIndexPhysicalName); |
| String schemaName = SchemaUtil.getSchemaNameFromFullName(dataTableFullName); |
| String tableName = SchemaUtil.getTableNameFromFullName(dataTableFullName); |
| |
| for (String physicalIndexName : physicalIndexes) { |
| if (physicalIndexName.equals(viewIndexPhysicalName)) { |
| continue; |
| } |
| String indexTableName = SchemaUtil.getTableNameFromFullName(physicalIndexName); |
| String pIndexName = SchemaUtil.getTableName(schemaName, indexTableName); |
| IndexInfo indexInfo = new IndexInfo(schemaName, tableName, |
| GLOBAL_INDEX_ID, pIndexName, pIndexName); |
| indexInfos.put(physicalIndexName, indexInfo); |
| } |
| |
| if (hasViewIndex) { |
| String viewSql = getViewSql(tableName, schemaName); |
| |
| ResultSet rs = conn.createStatement().executeQuery(viewSql); |
| |
| while (rs.next()) { |
| String viewFullName = rs.getString(1); |
| String viewName = SchemaUtil.getTableNameFromFullName(viewFullName); |
| String tenantId = rs.getString(2); |
| ArrayList<String> viewIndexes = findViewIndexes(conn, schemaName, viewName, |
| tenantId); |
| for (String viewIndex : viewIndexes) { |
| IndexInfo indexInfo = new IndexInfo(schemaName, viewName, |
| tenantId == null ? GLOBAL_INDEX_ID : tenantId, viewIndex, viewIndexPhysicalName); |
| indexInfos.put(viewIndex, indexInfo); |
| } |
| } |
| } |
| return indexInfos; |
| } |
| |
| @VisibleForTesting |
| public String getViewSql(String tableName, String schemaName) { |
| return "SELECT DISTINCT COLUMN_FAMILY, TENANT_ID FROM " |
| + "SYSTEM.CHILD_LINK " |
| + "WHERE TABLE_NAME = \'" + tableName + "\'" |
| + (!Strings.isNullOrEmpty(schemaName) ? " AND TABLE_SCHEM = \'" |
| + schemaName + "\'" : "") |
| + " AND LINK_TYPE = " |
| + PTable.LinkType.CHILD_TABLE.getSerializedValue(); |
| } |
| |
| private ArrayList<String> findViewIndexes(Connection conn, String schemaName, String viewName, |
| String tenantId) throws SQLException { |
| |
| String viewIndexesSql = getViewIndexesSql(viewName, schemaName, tenantId); |
| ArrayList<String> viewIndexes = new ArrayList<>(); |
| ResultSet |
| rs = |
| conn.createStatement().executeQuery(viewIndexesSql); |
| while(rs.next()) { |
| String viewIndexName = rs.getString(1); |
| viewIndexes.add(viewIndexName); |
| } |
| return viewIndexes; |
| } |
| |
| @VisibleForTesting |
| public String getViewIndexesSql(String viewName, String schemaName, String tenantId) { |
| return "SELECT DISTINCT COLUMN_FAMILY FROM " |
| + "SYSTEM.CATALOG " |
| + "WHERE TABLE_NAME = \'" + viewName + "\'" |
| + (!Strings.isNullOrEmpty(schemaName) ? " AND TABLE_SCHEM = \'" |
| + schemaName + "\'" : "") |
| + " AND LINK_TYPE = " + PTable.LinkType.INDEX_TABLE.getSerializedValue() |
| + (tenantId != null ? " AND TENANT_ID = \'" + tenantId + "\'" : ""); |
| } |
| |
| private class IndexInfo { |
| final private String schemaName; |
| final private String baseTable; |
| final private String tenantId; |
| final private String indexName; |
| final private String physicalIndexTableName; |
| |
| public IndexInfo(String schemaName, String baseTable, String tenantId, String indexName, |
| String physicalIndexTableName) { |
| this.schemaName = schemaName; |
| this.baseTable = baseTable; |
| this.tenantId = tenantId; |
| this.indexName = indexName; |
| this.physicalIndexTableName = physicalIndexTableName; |
| } |
| |
| public String getSchemaName() { |
| return schemaName; |
| } |
| |
| public String getBaseTable() { |
| return baseTable; |
| } |
| |
| public String getTenantId() { |
| return tenantId; |
| } |
| |
| public String getIndexName() { |
| return indexName; |
| } |
| |
| public String getPhysicalIndexTableName() { |
| return physicalIndexTableName; |
| } |
| } |
| |
| public static void main (String[] args) throws Exception { |
| int result = ToolRunner.run(new IndexUpgradeTool(), args); |
| System.exit(result); |
| } |
| } |