PHOENIX-5531: IndexUpgradeTool crashes for tables without any indexes + sleep problems (Priyank Porwal + Swaroopa Kadam)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
index 45ad003..8a7315d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
@@ -162,10 +162,11 @@
"CREATE TABLE TEST.MULTI_TENANT_TABLE " + " (TENANT_ID VARCHAR(15) NOT NULL,ID INTEGER NOT NULL"
+ ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true";
conn.createStatement().execute(createTblStr);
- conn.createStatement().execute("CREATE TABLE TRANSACTIONAL_TABLE(id bigint NOT NULL "
- + "PRIMARY KEY, a.name varchar, sal bigint, address varchar) "
- + " TRANSACTIONAL=true "//", TRANSACTION_PROVIDER='TEPHRA' "
- + ((tableDDLOptions.trim().length() > 0) ? "," : "") + tableDDLOptions);
+ String createTxnTableStr = "CREATE TABLE TRANSACTIONAL_TABLE(id bigint NOT NULL "
+ + "PRIMARY KEY, a.name varchar, sal bigint, address varchar) "
+ + " TRANSACTIONAL=true , TRANSACTION_PROVIDER='TEPHRA' "
+ + ((tableDDLOptions.trim().length() > 0) ? "," : "") + tableDDLOptions;
+ conn.createStatement().execute(createTxnTableStr);
//views
conn.createStatement().execute("CREATE VIEW TEST.MOCK1_VIEW (view_column varchar) "
@@ -304,6 +305,20 @@
}
@Test
+ public void testToolWithNoIndex() throws Exception {
+ if (!upgrade || isNamespaceEnabled) {
+ return;
+ }
+ conn.createStatement().execute("CREATE TABLE TEST.NEW_TABLE (id bigint NOT NULL "
+ + "PRIMARY KEY, a.name varchar, sal bigint, address varchar)" + tableDDLOptions);
+ iut.setInputTables("TEST.NEW_TABLE");
+ iut.prepareToolSetup();
+ int status = iut.executeTool();
+ Assert.assertEquals(0, status);
+ conn.createStatement().execute("DROP TABLE TEST.NEW_TABLE");
+ }
+
+ @Test
public void testToolWithInputFileParameter() throws Exception {
BufferedWriter writer = new BufferedWriter(new FileWriter(new File(INPUT_FILE)));
writer.write(INPUT_LIST);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
index f1fbae8..975c157 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
@@ -96,7 +96,8 @@
private static final Option LOG_FILE_OPTION = new Option("lf", "logfile",
true,
"Log file path where the logs are written");
- private static final Option INDEX_SYNC_REBUILD_OPTION = new Option("sr", "index-sync-rebuild",
+ private static final Option INDEX_SYNC_REBUILD_OPTION = new Option("sr",
+ "index-sync-rebuild",
false,
"[Optional]Whether or not synchronously rebuild the indexes; default rebuild asynchronous");
@@ -333,26 +334,40 @@
LOGGER.fine("Executing " + operation + " for " + dataTableFullName);
boolean mutable = !(dataTable.isImmutableRows());
+
+ disableTable(admin, dataTableFullName, indexes);
+ modifyTable(admin, dataTableFullName, indexes);
if (!mutable) {
- LOGGER.fine("Data table is immutable, waiting for "
- + (GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN + 1)
- + " minutes for client cache to expire");
- if (!test) {
+ if (!(test || dryRun || indexes.isEmpty())) {
+ // If the table is immutable, we need to wait for clients to purge
+ // their caches of table metadata
+ LOGGER.fine("Data table is immutable, waiting for "
+ + (GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN + 1)
+ + " minutes for client cache to expire");
Thread.sleep(
(GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN + 1) * 60 * 1000);
}
}
- disableTable(admin, dataTableFullName, indexes);
- modifyTable(admin, dataTableFullName, indexes);
enableTable(admin, dataTableFullName, indexes);
- rebuildIndexes(conn, conf, dataTableFullName);
} catch (IOException | SQLException | InterruptedException e) {
- LOGGER.severe("Something went wrong while executing " + operation + " steps " + e);
+ LOGGER.severe("Something went wrong while executing " + operation
+ + " steps " + e);
return -1;
}
return 0;
}).collect(Collectors.toList());
- return statusList.parallelStream().anyMatch(p -> p == -1) ? -1 : 0;
+
+ int status = statusList.parallelStream().anyMatch(p -> p == -1) ? -1 : 0;
+
+ // Opportunistically kick-off index rebuilds after upgrade operation
+ if (upgrade && status == 0) {
+ for (String dataTableFullName : tablesAndIndexes.keySet()) {
+ rebuildIndexes(conn, conf, dataTableFullName);
+ LOGGER.info("Started index rebuild post " + operation + " of "
+ + dataTableFullName);
+ }
+ }
+ return status;
}
private void modifyTable(Admin admin, String dataTableFullName, HashSet<String> indexes)
@@ -411,13 +426,23 @@
}
private void rebuildIndexes(Connection conn, Configuration conf, String dataTableFullName) {
- if (upgrade) {
- prepareToRebuildIndexes(conn, dataTableFullName);
+ try {
+ HashMap<String, IndexInfo> rebuildMap = prepareToRebuildIndexes(conn, dataTableFullName);
+
+ //for rebuilding indexes in case of upgrade and if there are indexes on the table/view.
+ if (rebuildMap.isEmpty()) {
+ LOGGER.info("No indexes to rebuild for table " + dataTableFullName);
+ return;
+ }
if(!test) {
indexingTool = new IndexTool();
+ indexingTool.setConf(conf);
}
- indexingTool.setConf(conf);
- rebuildIndexes(conn, dataTableFullName, indexingTool);
+ startIndexRebuilds(conn, dataTableFullName, rebuildMap, indexingTool);
+
+ } catch (SQLException e) {
+ LOGGER.severe("Failed to prepare the map for index rebuilds " + e);
+ throw new RuntimeException("Failed to prepare the map for index rebuilds");
}
}
@@ -477,10 +502,15 @@
}
}
}
- private int rebuildIndexes(Connection conn, String dataTable, IndexTool indexingTool) {
- for(Map.Entry<String, IndexInfo> indexMap : rebuildMap.get(dataTable).entrySet()) {
- String index = indexMap.getKey();
- IndexInfo indexInfo = indexMap.getValue();
+
+ private int startIndexRebuilds(Connection conn,
+ String dataTable,
+ HashMap<String, IndexInfo> indexInfos,
+ IndexTool indexingTool) {
+
+ for(Map.Entry<String, IndexInfo> entry : indexInfos.entrySet()) {
+ String index = entry.getKey();
+ IndexInfo indexInfo = entry.getValue();
String indexName = SchemaUtil.getTableNameFromFullName(index);
String tenantId = indexInfo.getTenantId();
String baseTable = indexInfo.getBaseTable();
@@ -588,65 +618,53 @@
}
}
- private void prepareToRebuildIndexes(Connection conn, String dataTableFullName) {
- try {
- Gson gson = new Gson();
- HashMap<String, IndexInfo> rebuildIndexes = new HashMap<>();
- HashSet<String> physicalIndexes = tablesAndIndexes.get(dataTableFullName);
+ private HashMap<String, IndexInfo> prepareToRebuildIndexes(Connection conn,
+ String dataTableFullName) throws SQLException {
- String viewIndexPhysicalName = MetaDataUtil
- .getViewIndexPhysicalName(dataTableFullName);
- boolean hasViewIndex = physicalIndexes.contains(viewIndexPhysicalName);
- String schemaName = SchemaUtil.getSchemaNameFromFullName(dataTableFullName);
- String tableName = SchemaUtil.getTableNameFromFullName(dataTableFullName);
+ HashMap<String, IndexInfo> indexInfos = new HashMap<>();
+ HashSet<String> physicalIndexes = tablesAndIndexes.get(dataTableFullName);
- for (String physicalIndexName : physicalIndexes) {
- if (physicalIndexName.equals(viewIndexPhysicalName)) {
- continue;
- }
- String indexTableName = SchemaUtil.getTableNameFromFullName(physicalIndexName);
- String pIndexName = SchemaUtil.getTableName(schemaName, indexTableName);
- IndexInfo indexInfo = new IndexInfo(schemaName, tableName,
- GLOBAL_INDEX_ID, pIndexName, pIndexName);
- rebuildIndexes.put(physicalIndexName, indexInfo);
+ String viewIndexPhysicalName = MetaDataUtil
+ .getViewIndexPhysicalName(dataTableFullName);
+ boolean hasViewIndex = physicalIndexes.contains(viewIndexPhysicalName);
+ String schemaName = SchemaUtil.getSchemaNameFromFullName(dataTableFullName);
+ String tableName = SchemaUtil.getTableNameFromFullName(dataTableFullName);
+
+ for (String physicalIndexName : physicalIndexes) {
+ if (physicalIndexName.equals(viewIndexPhysicalName)) {
+ continue;
}
-
- if (hasViewIndex) {
- String viewSql = "SELECT DISTINCT TABLE_NAME, TENANT_ID FROM "
- + "SYSTEM.CATALOG "
- + "WHERE COLUMN_FAMILY = \'" + dataTableFullName + "\' "
- + (!StringUtil.EMPTY_STRING.equals(schemaName) ? "AND TABLE_SCHEM = \'"
- + schemaName + "\' " : "")
- + "AND LINK_TYPE = "
- + PTable.LinkType.PHYSICAL_TABLE.getSerializedValue();
-
- ResultSet rs = conn.createStatement().executeQuery(viewSql);
-
- while (rs.next()) {
- String viewName = rs.getString(1);
- String tenantId = rs.getString(2);
- ArrayList<String> viewIndexes = findViewIndexes(conn, schemaName, viewName,
- tenantId);
- for (String viewIndex : viewIndexes) {
- IndexInfo indexInfo = new IndexInfo(schemaName, viewName,
- tenantId == null ? GLOBAL_INDEX_ID : tenantId, viewIndex, viewIndexPhysicalName);
- rebuildIndexes.put(viewIndex, indexInfo);
- }
- }
- }
- //for rebuilding indexes in case of upgrade and if there are indexes on the table/view.
- if (!rebuildIndexes.isEmpty()) {
- rebuildMap.put(dataTableFullName, rebuildIndexes);
- String json = gson.toJson(rebuildMap);
- LOGGER.info("Index rebuild map " + json);
- } else {
- LOGGER.info("No indexes to rebuild for table " + dataTableFullName);
- }
-
- } catch (SQLException e) {
- LOGGER.severe("Failed to prepare the map for index rebuilds " + e);
- throw new RuntimeException("Failed to prepare the map for index rebuilds");
+ String indexTableName = SchemaUtil.getTableNameFromFullName(physicalIndexName);
+ String pIndexName = SchemaUtil.getTableName(schemaName, indexTableName);
+ IndexInfo indexInfo = new IndexInfo(schemaName, tableName,
+ GLOBAL_INDEX_ID, pIndexName, pIndexName);
+ indexInfos.put(physicalIndexName, indexInfo);
}
+
+ if (hasViewIndex) {
+ String viewSql = "SELECT DISTINCT TABLE_NAME, TENANT_ID FROM "
+ + "SYSTEM.CATALOG "
+ + "WHERE COLUMN_FAMILY = \'" + dataTableFullName + "\' "
+ + (!StringUtil.EMPTY_STRING.equals(schemaName) ? "AND TABLE_SCHEM = \'"
+ + schemaName + "\' " : "")
+ + "AND LINK_TYPE = "
+ + PTable.LinkType.PHYSICAL_TABLE.getSerializedValue();
+
+ ResultSet rs = conn.createStatement().executeQuery(viewSql);
+
+ while (rs.next()) {
+ String viewName = rs.getString(1);
+ String tenantId = rs.getString(2);
+ ArrayList<String> viewIndexes = findViewIndexes(conn, schemaName, viewName,
+ tenantId);
+ for (String viewIndex : viewIndexes) {
+ IndexInfo indexInfo = new IndexInfo(schemaName, viewName,
+ tenantId == null ? GLOBAL_INDEX_ID : tenantId, viewIndex, viewIndexPhysicalName);
+ indexInfos.put(viewIndex, indexInfo);
+ }
+ }
+ }
+ return indexInfos;
}
private ArrayList<String> findViewIndexes(Connection conn, String schemaName, String viewName,