| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.coprocessor; |
| |
| import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; |
| |
| import java.io.IOException; |
| import java.security.PrivilegedExceptionAction; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.TimerTask; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.annotation.concurrent.GuardedBy; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellUtil; |
| import org.apache.hadoop.hbase.CoprocessorEnvironment; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.client.HTableInterface; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; |
| import org.apache.hadoop.hbase.coprocessor.ObserverContext; |
| import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; |
| import org.apache.hadoop.hbase.filter.CompareFilter; |
| import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| import org.apache.hadoop.hbase.regionserver.RegionScanner; |
| import org.apache.hadoop.hbase.security.User; |
| import org.apache.hadoop.hbase.util.Pair; |
| |
| import org.apache.phoenix.cache.GlobalCache; |
| import org.apache.phoenix.compile.MutationPlan; |
| import org.apache.phoenix.compile.PostDDLCompiler; |
| import org.apache.phoenix.execute.MutationState; |
| import org.apache.phoenix.hbase.index.util.IndexManagementUtil; |
| import org.apache.phoenix.index.IndexMaintainer; |
| import org.apache.phoenix.index.PhoenixIndexCodec; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; |
| import org.apache.phoenix.jdbc.PhoenixDriver; |
| import org.apache.phoenix.query.QueryConstants; |
| import org.apache.phoenix.query.QueryServices; |
| import org.apache.phoenix.query.QueryServicesOptions; |
| import org.apache.phoenix.schema.PIndexState; |
| import org.apache.phoenix.schema.PName; |
| import org.apache.phoenix.schema.PTable; |
| import org.apache.phoenix.schema.SortOrder; |
| import org.apache.phoenix.schema.TableRef; |
| import org.apache.phoenix.schema.types.PLong; |
| import org.apache.phoenix.util.ByteUtil; |
| import org.apache.phoenix.util.EnvironmentEdgeManager; |
| import org.apache.phoenix.util.IndexUtil; |
| import org.apache.phoenix.util.MetaDataUtil; |
| import org.apache.phoenix.util.PhoenixRuntime; |
| import org.apache.phoenix.util.PropertiesUtil; |
| import org.apache.phoenix.util.QueryUtil; |
| import org.apache.phoenix.util.ReadOnlyProps; |
| import org.apache.phoenix.util.ScanUtil; |
| import org.apache.phoenix.util.SchemaUtil; |
| import org.apache.phoenix.util.ServerUtil; |
| import org.apache.phoenix.util.UpgradeUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| |
| |
| /** |
| * Coprocessor for metadata related operations. This coprocessor would only be registered |
| * to SYSTEM.TABLE. |
| */ |
| @SuppressWarnings("deprecation") |
| public class MetaDataRegionObserver extends BaseRegionObserver { |
| public static final Logger LOGGER = LoggerFactory.getLogger(MetaDataRegionObserver.class); |
| public static final String REBUILD_INDEX_APPEND_TO_URL_STRING = "REBUILDINDEX"; |
| // PHOENIX-5094 To differentiate the increment in PENDING_DISABLE_COUNT made by client or index |
| // rebuilder, we are using large value for index rebuilder |
| public static final long PENDING_DISABLE_INACTIVE_STATE_COUNT = 10000L; |
| private static final byte[] SYSTEM_CATALOG_KEY = SchemaUtil.getTableKey( |
| ByteUtil.EMPTY_BYTE_ARRAY, |
| QueryConstants.SYSTEM_SCHEMA_NAME_BYTES, |
| PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE_BYTES); |
| protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); |
| private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD; |
| private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL; |
| private static Map<PName, Long> batchExecutedPerTableMap = new HashMap<PName, Long>(); |
| @GuardedBy("MetaDataRegionObserver.class") |
| private static Properties rebuildIndexConnectionProps; |
| // Added for test purposes |
| private long initialRebuildTaskDelay; |
| |
| @Override |
| public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c, |
| boolean abortRequested) { |
| executor.shutdownNow(); |
| GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().invalidateAll(); |
| } |
| |
| @Override |
| public void start(CoprocessorEnvironment env) throws IOException { |
| // sleep a little bit to compensate time clock skew when SYSTEM.CATALOG moves |
| // among region servers because we relies on server time of RS which is hosting |
| // SYSTEM.CATALOG |
| Configuration config = env.getConfiguration(); |
| long sleepTime = config.getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB, |
| QueryServicesOptions.DEFAULT_CLOCK_SKEW_INTERVAL); |
| try { |
| if(sleepTime > 0) { |
| Thread.sleep(sleepTime); |
| } |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| } |
| enableRebuildIndex = |
| config.getBoolean( |
| QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, |
| QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD); |
| rebuildIndexTimeInterval = |
| config.getLong( |
| QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, |
| QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL); |
| initialRebuildTaskDelay = |
| config.getLong( |
| QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, |
| QueryServicesOptions.DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY); |
| } |
| |
| @Override |
| public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) { |
| final RegionCoprocessorEnvironment env = e.getEnvironment(); |
| |
| Runnable r = new Runnable() { |
| @Override |
| public void run() { |
| HTableInterface metaTable = null; |
| HTableInterface statsTable = null; |
| try { |
| ReadOnlyProps props=new ReadOnlyProps(env.getConfiguration().iterator()); |
| Thread.sleep(1000); |
| metaTable = env.getTable( |
| SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props)); |
| statsTable = env.getTable( |
| SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, props)); |
| final HTableInterface mTable=metaTable; |
| final HTableInterface sTable=statsTable; |
| User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| if (UpgradeUtil.truncateStats(mTable, sTable)) { |
| LOGGER.info("Stats are successfully truncated for upgrade 4.7!!"); |
| } |
| return null; |
| } |
| }); |
| |
| } catch (Exception exception) { |
| LOGGER.warn("Exception while truncate stats.., please check and delete stats " + |
| "manually inorder to get proper result with old client!!"); |
| LOGGER.warn(exception.getStackTrace().toString()); |
| } finally { |
| try { |
| if (metaTable != null) { |
| metaTable.close(); |
| } |
| if (statsTable != null) { |
| statsTable.close(); |
| } |
| } catch (IOException e) {} |
| } |
| } |
| }; |
| Thread t = new Thread(r); |
| t.setDaemon(true); |
| t.start(); |
| |
| if (!enableRebuildIndex) { |
| LOGGER.info("Failure Index Rebuild is skipped by configuration."); |
| return; |
| } |
| // Ensure we only run one of the index rebuilder tasks |
| if (ServerUtil.isKeyInRegion(SYSTEM_CATALOG_KEY, e.getEnvironment().getRegion())) { |
| try { |
| Class.forName(PhoenixDriver.class.getName()); |
| initRebuildIndexConnectionProps(e.getEnvironment().getConfiguration()); |
| // starts index rebuild schedule work |
| BuildIndexScheduleTask task = new BuildIndexScheduleTask(e.getEnvironment()); |
| executor.scheduleWithFixedDelay(task, initialRebuildTaskDelay, rebuildIndexTimeInterval, TimeUnit.MILLISECONDS); |
| } catch (ClassNotFoundException ex) { |
| LOGGER.error("BuildIndexScheduleTask cannot start!", ex); |
| } |
| } |
| } |
| |
| /** |
| * Task runs periodically to build indexes whose INDEX_NEED_PARTIALLY_REBUILD is set true |
| * |
| */ |
| public static class BuildIndexScheduleTask extends TimerTask { |
| RegionCoprocessorEnvironment env; |
| private final long rebuildIndexBatchSize; |
| private final long configuredBatches; |
| private final long indexDisableTimestampThreshold; |
| private final long pendingDisableThreshold; |
| private final ReadOnlyProps props; |
| private final List<String> onlyTheseTables; |
| |
| public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) { |
| this(env,null); |
| } |
| |
| public BuildIndexScheduleTask(RegionCoprocessorEnvironment env, List<String> onlyTheseTables) { |
| this.onlyTheseTables = onlyTheseTables == null ? null : ImmutableList.copyOf(onlyTheseTables); |
| this.env = env; |
| Configuration configuration = env.getConfiguration(); |
| this.rebuildIndexBatchSize = configuration.getLong( |
| QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, HConstants.LATEST_TIMESTAMP); |
| this.configuredBatches = configuration.getLong( |
| QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, 10); |
| this.indexDisableTimestampThreshold = |
| configuration.getLong(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, |
| QueryServicesOptions.DEFAULT_INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD); |
| this.pendingDisableThreshold = |
| configuration.getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD, |
| QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD); |
| this.props = new ReadOnlyProps(env.getConfiguration().iterator()); |
| } |
| |
| public List<PTable> decrementIndexesPendingDisableCount(PhoenixConnection conn, PTable dataPTable, List<PTable> indexes){ |
| List<PTable> indexesIncremented = new ArrayList<>(); |
| for(PTable index :indexes) { |
| try { |
| String indexName = index.getName().getString(); |
| IndexUtil.incrementCounterForIndex(conn, indexName, -PENDING_DISABLE_INACTIVE_STATE_COUNT); |
| indexesIncremented.add(index); |
| }catch(Exception e) { |
| LOGGER.warn("Decrement of -" + PENDING_DISABLE_INACTIVE_STATE_COUNT |
| + " for index :" + index.getName().getString() + "of table: " |
| + dataPTable.getName().getString(), e); |
| } |
| } |
| return indexesIncremented; |
| } |
| @Override |
| public void run() { |
| // FIXME: we should replay the data table Put, as doing a partial index build would only add |
| // the new rows and not delete the previous index value. Also, we should restrict the scan |
| // to only data within this region (as otherwise *every* region will be running this code |
| // separately, all updating the same data. |
| RegionScanner scanner = null; |
| PhoenixConnection conn = null; |
| try { |
| Scan scan = new Scan(); |
| SingleColumnValueFilter filter = new SingleColumnValueFilter(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, |
| PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, |
| CompareFilter.CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0L)); |
| filter.setFilterIfMissing(true); |
| scan.setFilter(filter); |
| scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, |
| PhoenixDatabaseMetaData.TABLE_NAME_BYTES); |
| scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, |
| PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES); |
| scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, |
| PhoenixDatabaseMetaData.INDEX_STATE_BYTES); |
| scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, |
| PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); |
| |
| Map<PTable, List<Pair<PTable,Long>>> dataTableToIndexesMap = null; |
| boolean hasMore = false; |
| List<Cell> results = new ArrayList<Cell>(); |
| scanner = this.env.getRegion().getScanner(scan); |
| |
| do { |
| results.clear(); |
| hasMore = scanner.next(results); |
| if (results.isEmpty()) { |
| LOGGER.debug("Found no indexes with non zero INDEX_DISABLE_TIMESTAMP"); |
| break; |
| } |
| |
| Result r = Result.create(results); |
| byte[] disabledTimeStamp = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, |
| PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); |
| Cell indexStateCell = r.getColumnLatestCell(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES); |
| |
| if (disabledTimeStamp == null || disabledTimeStamp.length == 0) { |
| LOGGER.debug("Null or empty INDEX_DISABLE_TIMESTAMP"); |
| continue; |
| } |
| |
| long indexDisableTimestamp = |
| PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0, |
| SortOrder.ASC); |
| byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, |
| PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES); |
| if ((dataTable == null || dataTable.length == 0) || indexStateCell == null) { |
| // data table name can't be empty |
| LOGGER.debug("Null or data table name or index state"); |
| continue; |
| } |
| |
| byte[] indexStateBytes = CellUtil.cloneValue(indexStateCell); |
| byte[][] rowKeyMetaData = new byte[3][]; |
| SchemaUtil.getVarChars(r.getRow(), 3, rowKeyMetaData); |
| byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; |
| byte[] indexTable = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; |
| |
| // validity check |
| if (indexTable == null || indexTable.length == 0) { |
| LOGGER.debug("We find IndexTable empty during rebuild scan:" + scan |
| + "so, Index rebuild has been skipped for row=" + r); |
| continue; |
| } |
| |
| String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable); |
| if (onlyTheseTables != null && !onlyTheseTables.contains(dataTableFullName)) { |
| LOGGER.debug("Could not find " + dataTableFullName + |
| " in " + onlyTheseTables); |
| continue; |
| } |
| |
| if (conn == null) { |
| conn = getRebuildIndexConnection(env.getConfiguration()); |
| dataTableToIndexesMap = Maps.newHashMap(); |
| } |
| PTable dataPTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName); |
| |
| String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTable); |
| PTable indexPTable = PhoenixRuntime.getTableNoCache(conn, indexTableFullName); |
| // Sanity check in case index was removed from table |
| if (!dataPTable.getIndexes().contains(indexPTable)) { |
| LOGGER.debug(dataTableFullName + " does not contain " + |
| indexPTable.getName().getString()); |
| continue; |
| } |
| |
| PIndexState indexState = PIndexState.fromSerializedValue(indexStateBytes[0]); |
| long elapsedSinceDisable = EnvironmentEdgeManager.currentTimeMillis() - Math.abs(indexDisableTimestamp); |
| |
| // on an index write failure, the server side transitions to PENDING_DISABLE, then the client |
| // retries, and after retries are exhausted, disables the index |
| if (indexState == PIndexState.PENDING_DISABLE) { |
| if (elapsedSinceDisable > pendingDisableThreshold) { |
| // too long in PENDING_DISABLE - client didn't disable the index, so we do it here |
| IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.DISABLE, indexDisableTimestamp); |
| } |
| continue; |
| } |
| |
| // Only perform relatively expensive check for all regions online when index |
| // is disabled or pending active since that's the state it's placed into when |
| // an index write fails. |
| if ((indexState == PIndexState.DISABLE || indexState == PIndexState.PENDING_ACTIVE) |
| && !MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) { |
| LOGGER.debug("Index rebuild has been skipped because not all regions of" + |
| " index table=" + indexPTable.getName() + " are online."); |
| continue; |
| } |
| |
| if (elapsedSinceDisable > indexDisableTimestampThreshold) { |
| /* |
| * It has been too long since the index has been disabled and any future |
| * attempts to reenable it likely will fail. So we are going to mark the |
| * index as disabled and set the index disable timestamp to 0 so that the |
| * rebuild task won't pick up this index again for rebuild. |
| */ |
| try { |
| IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.DISABLE, 0l); |
| LOGGER.error("Unable to rebuild index " + indexTableFullName |
| + ". Won't attempt again since index disable timestamp is" + |
| " older than current time by " + indexDisableTimestampThreshold |
| + " milliseconds. Manual intervention needed to re-build" + |
| " the index"); |
| } catch (Throwable ex) { |
| LOGGER.error( |
| "Unable to mark index " + indexTableFullName + " as disabled.", ex); |
| } |
| continue; // don't attempt another rebuild irrespective of whether |
| // updateIndexState worked or not |
| } |
| // Allow index to begin incremental maintenance as index is back online and we |
| // cannot transition directly from DISABLED -> ACTIVE |
| if (indexState == PIndexState.DISABLE) { |
| if(IndexUtil.getIndexPendingDisableCount(conn, indexTableFullName) < PENDING_DISABLE_INACTIVE_STATE_COUNT){ |
| // to avoid incrementing again |
| IndexUtil.incrementCounterForIndex(conn, indexTableFullName, PENDING_DISABLE_INACTIVE_STATE_COUNT); |
| } |
| IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.INACTIVE, null); |
| continue; // Must wait until clients start to do index maintenance again |
| } else if (indexState == PIndexState.PENDING_ACTIVE) { |
| IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.ACTIVE, null); |
| continue; // Must wait until clients start to do index maintenance again |
| } else if (indexState != PIndexState.INACTIVE && indexState != PIndexState.ACTIVE) { |
| LOGGER.warn("Unexpected index state of " + indexTableFullName + "=" |
| + indexState + ". Skipping partial rebuild attempt."); |
| continue; |
| } |
| long currentTime = EnvironmentEdgeManager.currentTimeMillis(); |
| long forwardOverlapDurationMs = env.getConfiguration().getLong( |
| QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, |
| QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME); |
| // Wait until no failures have occurred in at least forwardOverlapDurationMs |
| if (indexStateCell.getTimestamp() + forwardOverlapDurationMs > currentTime) { |
| LOGGER.debug("Still must wait " + (indexStateCell.getTimestamp() + |
| forwardOverlapDurationMs - currentTime) + |
| " before starting rebuild for " + indexTableFullName); |
| continue; // Haven't waited long enough yet |
| } |
| Long upperBoundOfRebuild = indexStateCell.getTimestamp() + forwardOverlapDurationMs; |
| // Pass in upperBoundOfRebuild when setting index state or increasing disable ts |
| // and fail if index timestamp > upperBoundOfRebuild. |
| List<Pair<PTable,Long>> indexesToPartiallyRebuild = dataTableToIndexesMap.get(dataPTable); |
| if (indexesToPartiallyRebuild == null) { |
| indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size()); |
| dataTableToIndexesMap.put(dataPTable, indexesToPartiallyRebuild); |
| } |
| LOGGER.debug("We have found " + indexPTable.getIndexState() + " Index:" + |
| indexPTable.getName() + " on data table:" + dataPTable.getName() + |
| " which failed to be updated at " |
| + indexPTable.getIndexDisableTimestamp()); |
| indexesToPartiallyRebuild.add(new Pair<PTable,Long>(indexPTable,upperBoundOfRebuild)); |
| } while (hasMore); |
| |
| if (dataTableToIndexesMap != null) { |
| long backwardOverlapDurationMs = env.getConfiguration().getLong( |
| QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME_ATTRIB, |
| env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, |
| QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME)); |
| for (Map.Entry<PTable, List<Pair<PTable,Long>>> entry : dataTableToIndexesMap.entrySet()) { |
| PTable dataPTable = entry.getKey(); |
| List<Pair<PTable,Long>> pairs = entry.getValue(); |
| List<PTable> indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(pairs.size()); |
| try ( |
| HTableInterface metaTable = env.getTable( |
| SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props))) { |
| long earliestDisableTimestamp = Long.MAX_VALUE; |
| long latestUpperBoundTimestamp = Long.MIN_VALUE; |
| List<IndexMaintainer> maintainers = Lists |
| .newArrayListWithExpectedSize(pairs.size()); |
| int signOfDisableTimeStamp = 0; |
| for (Pair<PTable,Long> pair : pairs) { |
| // We need a way of differentiating the block writes to data table case from |
| // the leave index active case. In either case, we need to know the time stamp |
| // at which writes started failing so we can rebuild from that point. If we |
| // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES, |
| // then writes to the data table will be blocked (this is client side logic |
| // and we can't change this in a minor release). So we use the sign of the |
| // time stamp to differentiate. |
| PTable index = pair.getFirst(); |
| Long upperBoundTimestamp = pair.getSecond(); |
| long disabledTimeStampVal = index.getIndexDisableTimestamp(); |
| if (disabledTimeStampVal != 0) { |
| if (signOfDisableTimeStamp != 0 && signOfDisableTimeStamp != Long.signum(disabledTimeStampVal)) { |
| LOGGER.warn("Found unexpected mix of signs with " + |
| "INDEX_DISABLE_TIMESTAMP for " + |
| dataPTable.getName().getString() + " with " + |
| indexesToPartiallyRebuild); |
| } |
| signOfDisableTimeStamp = Long.signum(disabledTimeStampVal); |
| disabledTimeStampVal = Math.abs(disabledTimeStampVal); |
| if (disabledTimeStampVal < earliestDisableTimestamp) { |
| earliestDisableTimestamp = disabledTimeStampVal; |
| } |
| |
| indexesToPartiallyRebuild.add(index); |
| maintainers.add(index.getIndexMaintainer(dataPTable, conn)); |
| } |
| if (upperBoundTimestamp > latestUpperBoundTimestamp) { |
| latestUpperBoundTimestamp = upperBoundTimestamp; |
| } |
| } |
| // No indexes are disabled, so skip this table |
| if (earliestDisableTimestamp == Long.MAX_VALUE) { |
| LOGGER.debug("No indexes are disabled so continuing"); |
| continue; |
| } |
| long scanBeginTime = Math.max(0, earliestDisableTimestamp - backwardOverlapDurationMs); |
| long scanEndTime = Math.min(latestUpperBoundTimestamp, |
| getTimestampForBatch(scanBeginTime,batchExecutedPerTableMap.get(dataPTable.getName()))); |
| LOGGER.info("Starting to build " + dataPTable + " indexes " |
| + indexesToPartiallyRebuild + " from timestamp=" + |
| scanBeginTime + " until " + scanEndTime); |
| |
| TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false); |
| // TODO Need to set high timeout |
| PostDDLCompiler compiler = new PostDDLCompiler(conn); |
| MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null, scanEndTime); |
| Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(), maintainers); |
| |
| dataTableScan.setTimeRange(scanBeginTime, scanEndTime); |
| dataTableScan.setCacheBlocks(false); |
| dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); |
| |
| ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable( |
| ByteUtil.EMPTY_BYTE_ARRAY); |
| IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, |
| conn); |
| byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); |
| dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); |
| ScanUtil.setClientVersion(dataTableScan, MetaDataProtocol.PHOENIX_VERSION); |
| LOGGER.info("Starting to partially build indexes:" + |
| indexesToPartiallyRebuild + " on data table:" + |
| dataPTable.getName() + " with the earliest disable timestamp:" |
| + earliestDisableTimestamp + " till " |
| + (scanEndTime == HConstants.LATEST_TIMESTAMP ? |
| "LATEST_TIMESTAMP" : scanEndTime)); |
| MutationState mutationState = plan.execute(); |
| long rowCount = mutationState.getUpdateCount(); |
| decrementIndexesPendingDisableCount(conn, dataPTable, indexesToPartiallyRebuild); |
| if (scanEndTime == latestUpperBoundTimestamp) { |
| LOGGER.info("Rebuild completed for all inactive/disabled indexes" + |
| " in data table:" + dataPTable.getName()); |
| } |
| LOGGER.info(" no. of datatable rows read in rebuilding process is " |
| + rowCount); |
| for (PTable indexPTable : indexesToPartiallyRebuild) { |
| String indexTableFullName = SchemaUtil.getTableName( |
| indexPTable.getSchemaName().getString(), |
| indexPTable.getTableName().getString()); |
| try { |
| if (scanEndTime == latestUpperBoundTimestamp) { |
| IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.ACTIVE, 0L, |
| latestUpperBoundTimestamp); |
| batchExecutedPerTableMap.remove(dataPTable.getName()); |
| LOGGER.info("Making Index:" + indexPTable.getTableName() + |
| " active after rebuilding"); |
| } else { |
| // Increment timestamp so that client sees updated disable timestamp |
| IndexUtil.updateIndexState(conn, indexTableFullName, indexPTable.getIndexState(), |
| scanEndTime * signOfDisableTimeStamp, latestUpperBoundTimestamp); |
| Long noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName()); |
| if (noOfBatches == null) { |
| noOfBatches = 0l; |
| } |
| batchExecutedPerTableMap.put(dataPTable.getName(), ++noOfBatches); |
| LOGGER.info( |
| "During Round-robin build: Successfully updated " + |
| "index disabled timestamp for " |
| + indexTableFullName + " to " + scanEndTime); |
| } |
| } catch (SQLException e) { |
| LOGGER.error("Unable to rebuild " + dataPTable + " index " + |
| indexTableFullName, e); |
| } |
| } |
| } catch (Exception e) { |
| LOGGER.error("Unable to rebuild " + dataPTable + " indexes " + |
| indexesToPartiallyRebuild, e); |
| } |
| } |
| } |
| } catch (Throwable t) { |
| LOGGER.warn("ScheduledBuildIndexTask failed!", t); |
| } finally { |
| if (scanner != null) { |
| try { |
| scanner.close(); |
| } catch (IOException ignored) { |
| LOGGER.debug("ScheduledBuildIndexTask can't close scanner.", ignored); |
| } |
| } |
| if (conn != null) { |
| try { |
| conn.close(); |
| } catch (SQLException ignored) { |
| LOGGER.debug("ScheduledBuildIndexTask can't close connection", ignored); |
| } |
| } |
| } |
| } |
| |
| private long getTimestampForBatch(long disabledTimeStamp, Long noOfBatches) { |
| if (disabledTimeStamp < 0 || rebuildIndexBatchSize > (HConstants.LATEST_TIMESTAMP |
| - disabledTimeStamp)) { return HConstants.LATEST_TIMESTAMP; } |
| long timestampForNextBatch = disabledTimeStamp + rebuildIndexBatchSize; |
| if (timestampForNextBatch < 0 || timestampForNextBatch > EnvironmentEdgeManager.currentTimeMillis() |
| || (noOfBatches != null && noOfBatches > configuredBatches)) { |
| // if timestampForNextBatch cross current time , then we should |
| // build the complete index |
| timestampForNextBatch = HConstants.LATEST_TIMESTAMP; |
| } |
| return timestampForNextBatch; |
| } |
| } |
| |
| @VisibleForTesting |
| public static synchronized void initRebuildIndexConnectionProps(Configuration config) { |
| if (rebuildIndexConnectionProps == null) { |
| Properties props = new Properties(); |
| long indexRebuildQueryTimeoutMs = |
| config.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB, |
| QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT); |
| long indexRebuildRPCTimeoutMs = |
| config.getLong(QueryServices.INDEX_REBUILD_RPC_TIMEOUT_ATTRIB, |
| QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_TIMEOUT); |
| long indexRebuildClientScannerTimeOutMs = |
| config.getLong(QueryServices.INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT_ATTRIB, |
| QueryServicesOptions.DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT); |
| int indexRebuildRpcRetriesCounter = |
| config.getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, |
| QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER); |
| // Set various phoenix and hbase level timeouts and rpc retries |
| props.setProperty(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, |
| Long.toString(indexRebuildQueryTimeoutMs)); |
| props.setProperty(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, |
| Long.toString(indexRebuildClientScannerTimeOutMs)); |
| props.setProperty(HConstants.HBASE_RPC_TIMEOUT_KEY, |
| Long.toString(indexRebuildRPCTimeoutMs)); |
| props.setProperty(HConstants.HBASE_CLIENT_RETRIES_NUMBER, |
| Long.toString(indexRebuildRpcRetriesCounter)); |
| // don't run a second index populations upsert select |
| props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); |
| rebuildIndexConnectionProps = PropertiesUtil.combineProperties(props, config); |
| } |
| } |
| |
| public static PhoenixConnection getRebuildIndexConnection(Configuration config) |
| throws SQLException, ClassNotFoundException { |
| initRebuildIndexConnectionProps(config); |
| //return QueryUtil.getConnectionOnServer(rebuildIndexConnectionProps, config).unwrap(PhoenixConnection.class); |
| return QueryUtil.getConnectionOnServerWithCustomUrl(rebuildIndexConnectionProps, |
| REBUILD_INDEX_APPEND_TO_URL_STRING).unwrap(PhoenixConnection.class); |
| } |
| } |