blob: 829d0bf015781a8348d3fbf32e61cdc5f4e5cdac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.handler;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
/**
* HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to WAL in
* secondary region replicas. This means that a secondary region replica can serve some edits from
* it's memstore that are still not flushed from primary. We do not want to allow secondary
* region's seqId to go back in time, when this secondary region is opened elsewhere after a
* crash or region move. We will trigger a flush cache in the primary region replica and wait
* for observing a complete flush cycle before marking the region readsEnabled. This handler does
* the flushing of the primary region replica and ensures that regular region opening is not
* blocked while the secondary replica is blocked on flush.
*/
@InterfaceAudience.Private
public class RegionReplicaFlushHandler extends EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
private final AsyncClusterConnection connection;
private final HRegion region;
public RegionReplicaFlushHandler(Server server, HRegion region) {
super(server, EventType.RS_REGION_REPLICA_FLUSH);
this.connection = server.getAsyncClusterConnection();
this.region = region;
}
@Override
public void process() throws IOException {
triggerFlushInPrimaryRegion(region);
}
@Override
protected void handleException(Throwable t) {
if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
LOG.error("Caught throwable while processing event " + eventType, t);
} else if (t instanceof RuntimeException) {
server.abort("Server aborting", t);
} else {
// something fishy since we cannot flush the primary region until all retries (retries from
// rpc times 35 trigger). We cannot close the region since there is no such mechanism to
// close a region without master triggering it. We just abort the server for now.
server.abort("ServerAborting because an exception was thrown", t);
}
}
private int getRetriesCount(Configuration conf) {
int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
if (numRetries > 10) {
int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
numRetries = numRetries / mult; // reset if HRS has multiplied this already
}
return numRetries;
}
void triggerFlushInPrimaryRegion(final HRegion region) throws IOException {
long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
int maxAttempts = getRetriesCount(connection.getConfiguration());
RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
if (LOG.isDebugEnabled()) {
LOG.debug("RPC'ing to primary " + ServerRegionReplicaUtil.
getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionNameAsString() +
" from " + region.getRegionInfo().getRegionNameAsString() + " to trigger FLUSH");
}
while (!region.isClosing() && !region.isClosed()
&& !server.isAborted() && !server.isStopped()) {
// TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we
// do not have to wait for the whole flush here, just initiate it.
FlushRegionResponse response;
try {
response = FutureUtils.get(connection.flush(ServerRegionReplicaUtil
.getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionName(), true));
} catch (IOException e) {
if (e instanceof TableNotFoundException || FutureUtils
.get(connection.getAdmin().isTableDisabled(region.getRegionInfo().getTable()))) {
return;
}
if (!counter.shouldRetry()) {
throw e;
}
// The reason that why we need to retry here is that, the retry for asynchronous admin
// request is much simpler than the normal operation, if we failed to locate the region once
// then we will throw the exception out and will not try to relocate again. So here we need
// to add some retries by ourselves to prevent shutting down the region server too
// frequent...
LOG.debug("Failed to trigger a flush of primary region replica {} of region {}, retry={}",
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
.getRegionNameAsString(),
region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes(), e);
try {
counter.sleepUntilNextRetry();
} catch (InterruptedException e1) {
throw new InterruptedIOException(e1.getMessage());
}
continue;
}
if (response.getFlushed()) {
// then we have to wait for seeing the flush entry. All reads will be rejected until we see
// a complete flush cycle or replay a region open event
if (LOG.isDebugEnabled()) {
LOG.debug("Triggered flush of primary region replica " +
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
.getRegionNameAsString() +
" for " + region.getRegionInfo().getEncodedName() +
"; now waiting and blocking reads until completes a full flush cycle");
}
region.setReadsEnabled(true);
break;
} else {
if (response.hasWroteFlushWalMarker()) {
if (response.getWroteFlushWalMarker()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Triggered empty flush marker (memstore empty) on primary region replica " +
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()).
getRegionNameAsString() + " for " + region.getRegionInfo().getEncodedName() +
"; now waiting and blocking reads until observing a flush marker");
}
region.setReadsEnabled(true);
break;
} else {
// somehow we were not able to get the primary to write the flush request. It may be
// closing or already flushing. Retry flush again after some sleep.
if (!counter.shouldRetry()) {
throw new IOException("Cannot cause primary to flush or drop a wal marker after " +
counter.getAttemptTimes() + " retries. Failing opening of this region replica " +
region.getRegionInfo().getRegionNameAsString());
} else {
LOG.warn(
"Cannot cause primary replica {} to flush or drop a wal marker " +
"for region replica {}, retry={}",
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo())
.getRegionNameAsString(),
region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes());
}
}
} else {
// nothing to do. Are we dealing with an old server?
LOG.warn(
"Was not able to trigger a flush from primary region due to old server version? " +
"Continuing to open the secondary region replica: " +
region.getRegionInfo().getRegionNameAsString());
break;
}
}
try {
counter.sleepUntilNextRetry();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
}
region.setReadsEnabled(true);
}
}