blob: 6cff148091ffa2c8c21fa7aef60327cffb8ffa23 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.index;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.hbase.index.exception.IndexWriteException;
import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy;
import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
import org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
/**
*
* Handler called in the event that index updates cannot be written to their
* region server. First attempts to disable the index and failing that falls
* back to the default behavior of killing the region server.
*
*/
public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
private static final Log LOG = LogFactory.getLog(PhoenixIndexFailurePolicy.class);
public static final String THROW_INDEX_WRITE_FAILURE = "THROW_INDEX_WRITE_FAILURE";
public static final String DISABLE_INDEX_ON_WRITE_FAILURE = "DISABLE_INDEX_ON_WRITE_FAILURE";
public static final String REBUILD_INDEX_ON_WRITE_FAILURE = "REBUILD_INDEX_ON_WRITE_FAILURE";
public static final String BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE = "BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE";
private RegionCoprocessorEnvironment env;
private boolean blockDataTableWritesOnFailure;
private boolean disableIndexOnFailure;
private boolean rebuildIndexOnFailure;
private boolean throwIndexWriteFailure;
public PhoenixIndexFailurePolicy() {
super(new KillServerOnFailurePolicy());
}
@Override
public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
super.setup(parent, env);
this.env = env;
rebuildIndexOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD);
HTableDescriptor htd = env.getRegion().getTableDesc();
// If rebuild index is turned off globally, no need to check the table because the background thread
// won't be running in this case
if (rebuildIndexOnFailure) {
String value = htd.getValue(REBUILD_INDEX_ON_WRITE_FAILURE);
if (value != null) {
rebuildIndexOnFailure = Boolean.parseBoolean(value);
}
}
disableIndexOnFailure = getDisableIndexOnFailure(env);
String value = htd.getValue(BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE);
if (value == null) {
blockDataTableWritesOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
} else {
blockDataTableWritesOnFailure = Boolean.parseBoolean(value);
}
value = htd.getValue(THROW_INDEX_WRITE_FAILURE);
if (value == null) {
throwIndexWriteFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_THROW_EXCEPTION_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_THROW_EXCEPTION);
} else {
throwIndexWriteFailure = Boolean.parseBoolean(value);
}
boolean killServer = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_KILL_SERVER, true);
if (!killServer) {
setDelegate(new LeaveIndexActiveFailurePolicy());
} // else, default in constructor is KillServerOnFailurePolicy
}
/**
* Attempt to disable the index table when we can't write to it, preventing future updates until the index is
* brought up to date, but allowing historical reads to continue until then.
* <p>
* In the case that we cannot reach the metadata information, we will fall back to the default policy and kill
* this server, so we can attempt to replay the edits on restart.
* </p>
* @param attempted the mutations that were attempted to be written and the tables to which they were written
* @param cause root cause of the failure
*/
@Override
public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
boolean throwing = true;
long timestamp = HConstants.LATEST_TIMESTAMP;
try {
timestamp = handleFailureWithExceptions(attempted, cause);
throwing = false;
} catch (Throwable t) {
LOG.warn("handleFailure failed", t);
super.handleFailure(attempted, cause);
throwing = false;
} finally {
if (!throwing) {
SQLException sqlException =
new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_WRITE_FAILURE)
.setRootCause(cause).setMessage(cause.getLocalizedMessage()).build()
.buildException();
IOException ioException = ServerUtil.wrapInDoNotRetryIOException(null, sqlException, timestamp);
Mutation m = attempted.entries().iterator().next().getValue();
boolean isIndexRebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
// Always throw if rebuilding index since the rebuilder needs to know if it was successful
if (throwIndexWriteFailure || isIndexRebuild) {
throw ioException;
} else {
LOG.warn("Swallowing index write failure", ioException);
}
}
}
}
private long handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> attempted,
Exception cause) throws Throwable {
Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
Map<String, Long> indexTableNames = new HashMap<String, Long>(refs.size());
// start by looking at all the tables to which we attempted to write
long timestamp = 0;
boolean leaveIndexActive = blockDataTableWritesOnFailure || !disableIndexOnFailure;
// if using TrackingParallelWriter, we know which indexes failed and only disable those
Set<HTableInterfaceReference> failedTables = cause instanceof MultiIndexWriteFailureException
? new HashSet<HTableInterfaceReference>(((MultiIndexWriteFailureException)cause).getFailedTables())
: Collections.<HTableInterfaceReference>emptySet();
for (HTableInterfaceReference ref : refs) {
if (failedTables.size() > 0 && !failedTables.contains(ref)) {
continue; // leave index active if its writes succeeded
}
long minTimeStamp = 0;
// get the minimum timestamp across all the mutations we attempted on that table
// FIXME: all cell timestamps should be the same
Collection<Mutation> mutations = attempted.get(ref);
if (mutations != null) {
for (Mutation m : mutations) {
for (List<Cell> kvs : m.getFamilyCellMap().values()) {
for (Cell kv : kvs) {
if (minTimeStamp == 0 || (kv.getTimestamp() >= 0 && minTimeStamp > kv.getTimestamp())) {
minTimeStamp = kv.getTimestamp();
}
}
}
}
}
timestamp = minTimeStamp;
// If the data table has local index column families then get local indexes to disable.
if (ref.getTableName().equals(env.getRegion().getTableDesc().getNameAsString())
&& MetaDataUtil.hasLocalIndexColumnFamily(env.getRegion().getTableDesc())) {
for (String tableName : getLocalIndexNames(ref, mutations)) {
indexTableNames.put(tableName, minTimeStamp);
}
// client disables the index, so we pass the index names in the thrown exception
if (cause instanceof MultiIndexWriteFailureException) {
List<HTableInterfaceReference> failedLocalIndexes =
Lists.newArrayList(Iterables.transform(indexTableNames.entrySet(),
new Function<Map.Entry<String, Long>, HTableInterfaceReference>() {
@Override
public HTableInterfaceReference apply(Entry<String, Long> input) {
return new HTableInterfaceReference(new ImmutableBytesPtr(
Bytes.toBytes(input.getKey())));
}
}));
((MultiIndexWriteFailureException) cause).setFailedTables(failedLocalIndexes);
}
} else {
indexTableNames.put(ref.getTableName(), minTimeStamp);
}
}
// Nothing to do if we're not disabling the index and not rebuilding on failure
if (!disableIndexOnFailure && !rebuildIndexOnFailure) {
return timestamp;
}
PIndexState newState = disableIndexOnFailure ? PIndexState.PENDING_DISABLE : PIndexState.PENDING_ACTIVE;
// for all the index tables that we've found, try to disable them and if that fails, try to
for (Map.Entry<String, Long> tableTimeElement :indexTableNames.entrySet()){
String indexTableName = tableTimeElement.getKey();
long minTimeStamp = tableTimeElement.getValue();
// We need a way of differentiating the block writes to data table case from
// the leave index active case. In either case, we need to know the time stamp
// at which writes started failing so we can rebuild from that point. If we
// keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES,
// then writes to the data table will be blocked (this is client side logic
// and we can't change this in a minor release). So we use the sign of the
// time stamp to differentiate.
if (!disableIndexOnFailure && !blockDataTableWritesOnFailure) {
minTimeStamp *= -1;
}
// Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
try (HTableInterface systemTable = env.getTable(SchemaUtil
.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()))) {
MetaDataMutationResult result = IndexUtil.updateIndexState(indexTableName, minTimeStamp,
systemTable, newState);
if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations");
continue;
}
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
if (leaveIndexActive) {
LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = "
+ result.getMutationCode());
// If we're not disabling the index, then we don't want to throw as throwing
// will lead to the RS being shutdown.
if (blockDataTableWritesOnFailure) {
throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed.");
}
} else {
LOG.warn("Attempt to disable index " + indexTableName + " failed with code = "
+ result.getMutationCode() + ". Will use default failure policy instead.");
throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed.");
}
}
if (leaveIndexActive)
LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + " due to an exception while writing updates.",
cause);
else
LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.",
cause);
}
}
// Return the cell time stamp (note they should all be the same)
return timestamp;
}
private Collection<? extends String> getLocalIndexNames(HTableInterfaceReference ref,
Collection<Mutation> mutations) throws IOException {
Set<String> indexTableNames = new HashSet<String>(1);
PhoenixConnection conn = null;
try {
conn = QueryUtil.getConnectionOnServer(this.env.getConfiguration()).unwrap(
PhoenixConnection.class);
PTable dataTable = PhoenixRuntime.getTableNoCache(conn, ref.getTableName());
List<PTable> indexes = dataTable.getIndexes();
// local index used to get view id from index mutation row key.
PTable localIndex = null;
Map<ImmutableBytesWritable, String> localIndexNames =
new HashMap<ImmutableBytesWritable, String>();
for (PTable index : indexes) {
if (index.getIndexType() == IndexType.LOCAL) {
localIndex = index;
localIndexNames.put(new ImmutableBytesWritable(MetaDataUtil.getViewIndexIdDataType().toBytes(
index.getViewIndexId())), index.getName().getString());
}
}
if (localIndex == null) {
return Collections.emptySet();
}
IndexMaintainer indexMaintainer = localIndex.getIndexMaintainer(dataTable, conn);
HRegionInfo regionInfo = this.env.getRegion().getRegionInfo();
int offset =
regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length
: regionInfo.getStartKey().length;
byte[] viewId = null;
for (Mutation mutation : mutations) {
viewId =
indexMaintainer.getViewIndexIdFromIndexRowKey(
new ImmutableBytesWritable(mutation.getRow(), offset,
mutation.getRow().length - offset));
String indexTableName = localIndexNames.get(new ImmutableBytesWritable(viewId));
if (indexTableName == null) {
LOG.error("Unable to find local index on " + ref.getTableName() + " with viewID of " + Bytes.toStringBinary(viewId));
} else {
indexTableNames.add(indexTableName);
}
}
} catch (ClassNotFoundException e) {
throw new IOException(e);
} catch (SQLException e) {
throw new IOException(e);
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
throw new IOException(e);
}
}
}
return indexTableNames;
}
/**
* Check config for whether to disable index on index write failures
* @param htd
* @param config
* @param connection
* @return The table config for {@link PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE}
* @throws SQLException
*/
public static boolean getDisableIndexOnFailure(RegionCoprocessorEnvironment env) {
HTableDescriptor htd = env.getRegion().getTableDesc();
Configuration config = env.getConfiguration();
String value = htd.getValue(PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE);
boolean disableIndexOnFailure;
if (value == null) {
disableIndexOnFailure =
config.getBoolean(QueryServices.INDEX_FAILURE_DISABLE_INDEX,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX);
} else {
disableIndexOnFailure = Boolean.parseBoolean(value);
}
return disableIndexOnFailure;
}
/**
* If we're leaving the index active after index write failures on the server side, then we get
* the exception on the client side here after hitting the max # of hbase client retries. We
* disable the index as it may now be inconsistent. The indexDisableTimestamp was already set
* on the server side, so the rebuilder will be run.
*/
private static void handleIndexWriteFailureFromClient(IndexWriteException indexWriteException,
PhoenixConnection conn) {
handleExceptionFromClient(indexWriteException, conn, PIndexState.DISABLE);
}
private static void handleIndexWriteSuccessFromClient(IndexWriteException indexWriteException,
PhoenixConnection conn) {
handleExceptionFromClient(indexWriteException, conn, PIndexState.ACTIVE);
}
private static void handleExceptionFromClient(IndexWriteException indexWriteException,
PhoenixConnection conn, PIndexState indexState) {
try {
Set<String> indexesToUpdate = new HashSet<>();
if (indexWriteException instanceof MultiIndexWriteFailureException) {
MultiIndexWriteFailureException indexException =
(MultiIndexWriteFailureException) indexWriteException;
List<HTableInterfaceReference> failedIndexes = indexException.getFailedTables();
if (indexException.isDisableIndexOnFailure() && failedIndexes != null) {
for (HTableInterfaceReference failedIndex : failedIndexes) {
String failedIndexTable = failedIndex.getTableName();
if (!indexesToUpdate.contains(failedIndexTable)) {
updateIndex(failedIndexTable, conn, indexState);
indexesToUpdate.add(failedIndexTable);
}
}
}
} else if (indexWriteException instanceof SingleIndexWriteFailureException) {
SingleIndexWriteFailureException indexException =
(SingleIndexWriteFailureException) indexWriteException;
String failedIndex = indexException.getTableName();
if (indexException.isDisableIndexOnFailure() && failedIndex != null) {
updateIndex(failedIndex, conn, indexState);
}
}
} catch (Exception handleE) {
LOG.warn("Error while trying to handle index write exception", indexWriteException);
}
}
public static interface MutateCommand {
void doMutation() throws IOException;
}
/**
* Retries a mutationBatch where the index write failed.
* One attempt should have already been made before calling this.
* Max retries and exponential backoff logic mimics that of HBase's client
* If max retries are hit, the index is disabled.
* If the write is successful on a subsequent retry, the index is set back to ACTIVE
* @param mutateCommand mutation command to execute
* @param iwe original IndexWriteException
* @param connection connection to use
* @param config config used to get retry settings
* @throws Exception
*/
public static void doBatchWithRetries(MutateCommand mutateCommand,
IndexWriteException iwe, PhoenixConnection connection, ReadOnlyProps config)
throws IOException {
incrementPendingDisableCounter(iwe, connection);
int maxTries = config.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
long pause = config.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
int numRetry = 1; // already tried once
// calculate max time to retry for
int timeout = 0;
for (int i = 0; i < maxTries; ++i) {
timeout = (int) (timeout + ConnectionUtils.getPauseTime(pause, i));
}
long canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
while (canRetryMore(numRetry++, maxTries, canRetryUntil)) {
try {
Thread.sleep(ConnectionUtils.getPauseTime(pause, numRetry)); // HBase's exponential backoff
mutateCommand.doMutation();
// success - change the index state from PENDING_DISABLE back to ACTIVE
handleIndexWriteSuccessFromClient(iwe, connection);
return;
} catch (IOException e) {
SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(e);
if (inferredE == null || inferredE.getErrorCode() != SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
// if it's not an index write exception, throw exception, to be handled normally in caller's try-catch
throw e;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
// max retries hit - disable the index
handleIndexWriteFailureFromClient(iwe, connection);
throw new DoNotRetryIOException(iwe); // send failure back to client
}
private static void incrementPendingDisableCounter(IndexWriteException indexWriteException,PhoenixConnection conn) {
try {
Set<String> indexesToUpdate = new HashSet<>();
if (indexWriteException instanceof MultiIndexWriteFailureException) {
MultiIndexWriteFailureException indexException =
(MultiIndexWriteFailureException) indexWriteException;
List<HTableInterfaceReference> failedIndexes = indexException.getFailedTables();
if (indexException.isDisableIndexOnFailure() && failedIndexes != null) {
for (HTableInterfaceReference failedIndex : failedIndexes) {
String failedIndexTable = failedIndex.getTableName();
if (!indexesToUpdate.contains(failedIndexTable)) {
incrementCounterForIndex(conn,failedIndexTable);
indexesToUpdate.add(failedIndexTable);
}
}
}
} else if (indexWriteException instanceof SingleIndexWriteFailureException) {
SingleIndexWriteFailureException indexException =
(SingleIndexWriteFailureException) indexWriteException;
String failedIndex = indexException.getTableName();
if (indexException.isDisableIndexOnFailure() && failedIndex != null) {
incrementCounterForIndex(conn,failedIndex);
}
}
} catch (Exception handleE) {
LOG.warn("Error while trying to handle index write exception", indexWriteException);
}
}
private static void incrementCounterForIndex(PhoenixConnection conn, String failedIndexTable) throws IOException {
incrementCounterForIndex(conn, failedIndexTable, 1);
}
private static void decrementCounterForIndex(PhoenixConnection conn, String failedIndexTable) throws IOException {
incrementCounterForIndex(conn, failedIndexTable, -1);
}
private static void incrementCounterForIndex(PhoenixConnection conn, String failedIndexTable,long amount) throws IOException {
byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(failedIndexTable);
Increment incr = new Increment(indexTableKey);
incr.addColumn(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, amount);
try {
conn.getQueryServices()
.getTable(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
conn.getQueryServices().getProps()).getName())
.increment(incr);
} catch (SQLException e) {
throw new IOException(e);
}
}
private static boolean canRetryMore(int numRetry, int maxRetries, long canRetryUntil) {
// If there is a single try we must not take into account the time.
return numRetry < maxRetries
|| (maxRetries > 1 && EnvironmentEdgeManager.currentTimeMillis() < canRetryUntil);
}
/**
* Converts from SQLException to IndexWriteException
* @param sqlE the SQLException
* @return the IndexWriteException
*/
public static IndexWriteException getIndexWriteException(SQLException sqlE) {
String sqlMsg = sqlE.getMessage();
if (sqlMsg.contains(MultiIndexWriteFailureException.FAILURE_MSG)) {
return new MultiIndexWriteFailureException(sqlMsg);
} else if (sqlMsg.contains(SingleIndexWriteFailureException.FAILED_MSG)) {
return new SingleIndexWriteFailureException(sqlMsg);
}
return null;
}
private static void updateIndex(String indexFullName, PhoenixConnection conn,
PIndexState indexState) throws SQLException, IOException {
//Decrement the counter because we will be here when client give retry after getting failed or succeed
decrementCounterForIndex(conn,indexFullName);
Long indexDisableTimestamp = null;
if (PIndexState.DISABLE.equals(indexState)) {
LOG.info("Disabling index after hitting max number of index write retries: "
+ indexFullName);
IndexUtil.updateIndexState(conn, indexFullName, indexState, indexDisableTimestamp);
} else if (PIndexState.ACTIVE.equals(indexState)) {
LOG.debug("Resetting index to active after subsequent success " + indexFullName);
//At server disabled timestamp will be reset only if there is no other client is in PENDING_DISABLE state
indexDisableTimestamp = 0L;
try {
IndexUtil.updateIndexState(conn, indexFullName, indexState, indexDisableTimestamp);
} catch (SQLException e) {
// It's possible that some other client had made the Index DISABLED already , so we can ignore unallowed
// transition(DISABLED->ACTIVE)
if (e.getErrorCode() != SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION.getErrorCode()) { throw e; }
}
}
}
}