blob: 877e1d4927fbe9d24394dde29ad59b8652db706a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.util.Time.now;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Service to periodically check if the {@link org.apache.hadoop.hdfs.server.
* federation.store.StateStoreService StateStoreService} cached information in
* the {@link Router} is up to date. This is for performance and removes the
* {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService
* StateStoreService} from the critical path in common operations.
*/
public class RouterSafemodeService extends PeriodicService {
private static final Logger LOG =
LoggerFactory.getLogger(RouterSafemodeService.class);
/** Router to manage safe mode. */
private final Router router;
/**
* If we are in safe mode, fail requests as if a standby NN.
* Router can enter safe mode in two different ways:
* 1. upon start up: router enters this mode after service start, and will
* exit after certain time threshold;
* 2. via admin command: router enters this mode via admin command:
* dfsrouteradmin -safemode enter
* and exit after admin command:
* dfsrouteradmin -safemode leave
*/
/** Whether Router is in safe mode */
private volatile boolean safeMode;
/** Whether the Router safe mode is set manually (i.e., via Router admin) */
private volatile boolean isSafeModeSetManually;
/** Interval in ms to wait post startup before allowing RPC requests. */
private long startupInterval;
/** Interval in ms after which the State Store cache is too stale. */
private long staleInterval;
/** Start time in ms of this service. */
private long startupTime;
/** The time the Router enters safe mode in milliseconds. */
private long enterSafeModeTime = now();
/**
* Create a new Cache update service.
*
* @param router Router containing the cache.
*/
public RouterSafemodeService(Router router) {
super(RouterSafemodeService.class.getSimpleName());
this.router = router;
}
/**
* Return whether the current Router is in safe mode.
*/
boolean isInSafeMode() {
return this.safeMode;
}
/**
* Set the flag to indicate that the safe mode for this Router is set manually
* via the Router admin command.
*/
void setManualSafeMode(boolean mode) {
this.safeMode = mode;
this.isSafeModeSetManually = mode;
}
/**
* Enter safe mode.
*/
private void enter() {
LOG.info("Entering safe mode");
enterSafeModeTime = now();
safeMode = true;
router.updateRouterState(RouterServiceState.SAFEMODE);
}
/**
* Leave safe mode.
*/
private void leave() {
// Cache recently updated, leave safemode
long timeInSafemode = now() - enterSafeModeTime;
LOG.info("Leaving safe mode after {} milliseconds", timeInSafemode);
RouterMetrics routerMetrics = router.getRouterMetrics();
if (routerMetrics == null) {
LOG.error("The Router metrics are not enabled");
} else {
routerMetrics.setSafeModeTime(timeInSafemode);
}
safeMode = false;
router.updateRouterState(RouterServiceState.RUNNING);
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
// Use same interval as cache update service
this.setIntervalMs(conf.getTimeDuration(
RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT,
TimeUnit.MILLISECONDS));
this.startupInterval = conf.getTimeDuration(
RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION,
RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT,
TimeUnit.MILLISECONDS);
LOG.info("Leave startup safe mode after {} ms", this.startupInterval);
this.staleInterval = conf.getTimeDuration(
RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION,
RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT,
TimeUnit.MILLISECONDS);
LOG.info("Enter safe mode after {} ms without reaching the State Store",
this.staleInterval);
this.startupTime = Time.now();
// Initializing the RPC server in safe mode, it will disable it later
enter();
super.serviceInit(conf);
}
@Override
public void periodicInvoke() {
long now = Time.now();
long delta = now - startupTime;
if (delta < startupInterval) {
LOG.info("Delaying safemode exit for {} milliseconds...",
this.startupInterval - delta);
return;
}
StateStoreService stateStore = router.getStateStore();
long cacheUpdateTime = stateStore.getCacheUpdateTime();
boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval;
// Always update to indicate our cache was updated
if (isCacheStale) {
if (!safeMode) {
enter();
}
} else if (safeMode && !isSafeModeSetManually) {
// Cache recently updated, leave safe mode
leave();
}
}
}