blob: a0744a652bb1aef06df7f6751aa91ecc41de2b99 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMBean;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.impl.RouterStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* A service to initialize a
* {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
* StateStoreDriver} and maintain the connection to the data store. There are
* multiple state store driver connections supported:
* <ul>
* <li>File
* {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
* StateStoreFileImpl StateStoreFileImpl}
* <li>ZooKeeper
* {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl.
* StateStoreZooKeeperImpl StateStoreZooKeeperImpl}
* </ul>
* <p>
* The service also supports the dynamic registration of record stores like:
* <ul>
* <li>{@link MembershipStore}: state of the Namenodes in the
* federation.
* <li>{@link MountTableStore}: Mount table between to subclusters.
* See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
* <li>{@link RebalancerStore}: Log of the rebalancing operations.
* <li>{@link RouterStore}: Router state in the federation.
* <li>{@link TokenStore}: Tokens in the federation.
* </ul>
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class StateStoreService extends CompositeService {
private static final Logger LOG =
LoggerFactory.getLogger(StateStoreService.class);
/** State Store configuration. */
private Configuration conf;
/** Identifier for the service. */
private String identifier;
/** Driver for the back end connection. */
private StateStoreDriver driver;
/** Service to maintain data store connection. */
private StateStoreConnectionMonitorService monitorService;
/** StateStore metrics. */
private StateStoreMetrics metrics;
/** Supported record stores. */
private final Map<
Class<? extends BaseRecord>, RecordStore<? extends BaseRecord>>
recordStores;
/** Service to maintain State Store caches. */
private StateStoreCacheUpdateService cacheUpdater;
/** Time the cache was last successfully updated. */
private long cacheLastUpdateTime;
/** List of internal caches to update. */
private final List<StateStoreCache> cachesToUpdateInternal;
/** List of external caches to update. */
private final List<StateStoreCache> cachesToUpdateExternal;
public StateStoreService() {
super(StateStoreService.class.getName());
// Records and stores supported by this implementation
this.recordStores = new HashMap<>();
// Caches to maintain
this.cachesToUpdateInternal = new ArrayList<>();
this.cachesToUpdateExternal = new ArrayList<>();
}
/**
* Initialize the State Store and the connection to the backend.
*
* @param config Configuration for the State Store.
* @throws IOException
*/
@Override
protected void serviceInit(Configuration config) throws Exception {
this.conf = config;
// Create implementation of State Store
Class<? extends StateStoreDriver> driverClass = this.conf.getClass(
RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT,
StateStoreDriver.class);
this.driver = ReflectionUtils.newInstance(driverClass, this.conf);
if (this.driver == null) {
throw new IOException("Cannot create driver for the State Store");
}
// Add supported record stores
addRecordStore(MembershipStoreImpl.class);
addRecordStore(MountTableStoreImpl.class);
addRecordStore(RouterStoreImpl.class);
// Check the connection to the State Store periodically
this.monitorService = new StateStoreConnectionMonitorService(this);
this.addService(monitorService);
// Set expirations intervals for each record
MembershipState.setExpirationMs(conf.getLong(
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT));
RouterState.setExpirationMs(conf.getTimeDuration(
RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT,
TimeUnit.MILLISECONDS));
// Cache update service
this.cacheUpdater = new StateStoreCacheUpdateService(this);
addService(this.cacheUpdater);
// Create metrics for the State Store
this.metrics = StateStoreMetrics.create(conf);
// Adding JMX interface
try {
StandardMBean bean = new StandardMBean(metrics, StateStoreMBean.class);
ObjectName registeredObject =
MBeans.register("Router", "StateStore", bean);
LOG.info("Registered StateStoreMBean: {}", registeredObject);
} catch (NotCompliantMBeanException e) {
throw new RuntimeException("Bad StateStoreMBean setup", e);
} catch (MetricsException e) {
LOG.error("Failed to register State Store bean {}", e.getMessage());
}
super.serviceInit(this.conf);
}
@Override
protected void serviceStart() throws Exception {
loadDriver();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
closeDriver();
if (metrics != null) {
metrics.shutdown();
metrics = null;
}
super.serviceStop();
}
/**
* Add a record store to the State Store. It includes adding the store, the
* supported record and the cache management.
*
* @param clazz Class of the record store to track.
* @return New record store.
* @throws ReflectiveOperationException
*/
private <T extends RecordStore<?>> void addRecordStore(
final Class<T> clazz) throws ReflectiveOperationException {
assert this.getServiceState() == STATE.INITED :
"Cannot add record to the State Store once started";
T recordStore = RecordStore.newInstance(clazz, this.getDriver());
Class<? extends BaseRecord> recordClass = recordStore.getRecordClass();
this.recordStores.put(recordClass, recordStore);
// Subscribe for cache updates
if (recordStore instanceof StateStoreCache) {
StateStoreCache cachedRecordStore = (StateStoreCache) recordStore;
this.cachesToUpdateInternal.add(cachedRecordStore);
}
}
/**
* Get the record store in this State Store for a given interface.
*
* @param recordStoreClass Class of the record store.
* @return Registered record store or null if not found.
*/
public <T extends RecordStore<?>> T getRegisteredRecordStore(
final Class<T> recordStoreClass) {
for (RecordStore<? extends BaseRecord> recordStore :
this.recordStores.values()) {
if (recordStoreClass.isInstance(recordStore)) {
@SuppressWarnings("unchecked")
T recordStoreChecked = (T) recordStore;
return recordStoreChecked;
}
}
return null;
}
/**
* List of records supported by this State Store.
*
* @return List of supported record classes.
*/
public Collection<Class<? extends BaseRecord>> getSupportedRecords() {
return this.recordStores.keySet();
}
/**
* Load the State Store driver. If successful, refresh cached data tables.
*/
public void loadDriver() {
synchronized (this.driver) {
if (!isDriverReady()) {
String driverName = this.driver.getClass().getSimpleName();
if (this.driver.init(
conf, getIdentifier(), getSupportedRecords(), metrics)) {
LOG.info("Connection to the State Store driver {} is open and ready",
driverName);
this.refreshCaches();
} else {
LOG.error("Cannot initialize State Store driver {}", driverName);
}
}
}
}
/**
* Check if the driver is ready to be used.
*
* @return If the driver is ready.
*/
public boolean isDriverReady() {
return this.driver.isDriverReady();
}
/**
* Manually shuts down the driver.
*
* @throws Exception If the driver cannot be closed.
*/
@VisibleForTesting
public void closeDriver() throws Exception {
if (this.driver != null) {
this.driver.close();
}
}
/**
* Get the state store driver.
*
* @return State store driver.
*/
public StateStoreDriver getDriver() {
return this.driver;
}
/**
* Fetch a unique identifier for this state store instance. Typically it is
* the address of the router.
*
* @return Unique identifier for this store.
*/
public String getIdentifier() {
return this.identifier;
}
/**
* Set a unique synchronization identifier for this store.
*
* @param id Unique identifier, typically the router's RPC address.
*/
public void setIdentifier(String id) {
this.identifier = id;
}
//
// Cached state store data
//
/**
* The last time the state store cache was fully updated.
*
* @return Timestamp.
*/
public long getCacheUpdateTime() {
return this.cacheLastUpdateTime;
}
/**
* Stops the cache update service.
*/
@VisibleForTesting
public void stopCacheUpdateService() {
if (this.cacheUpdater != null) {
this.cacheUpdater.stop();
removeService(this.cacheUpdater);
this.cacheUpdater = null;
}
}
/**
* Register a cached record store for automatic periodic cache updates.
*
* @param client Client to the state store.
*/
public void registerCacheExternal(StateStoreCache client) {
this.cachesToUpdateExternal.add(client);
}
/**
* Refresh the cache with information from the State Store. Called
* periodically by the CacheUpdateService to maintain data caches and
* versions.
*/
public void refreshCaches() {
refreshCaches(false);
}
/**
* Refresh the cache with information from the State Store. Called
* periodically by the CacheUpdateService to maintain data caches and
* versions.
* @param force If we force the refresh.
*/
public void refreshCaches(boolean force) {
boolean success = true;
if (isDriverReady()) {
List<StateStoreCache> cachesToUpdate = new LinkedList<>();
cachesToUpdate.addAll(cachesToUpdateInternal);
cachesToUpdate.addAll(cachesToUpdateExternal);
for (StateStoreCache cachedStore : cachesToUpdate) {
String cacheName = cachedStore.getClass().getSimpleName();
boolean result = false;
try {
result = cachedStore.loadCache(force);
} catch (IOException e) {
LOG.error("Error updating cache for {}", cacheName, e);
result = false;
}
if (!result) {
success = false;
LOG.error("Cache update failed for cache {}", cacheName);
}
}
} else {
success = false;
LOG.info("Skipping State Store cache update, driver is not ready.");
}
if (success) {
// Uses local time, not driver time.
this.cacheLastUpdateTime = Time.now();
}
}
/**
* Update the cache for a specific record store.
*
* @param clazz Class of the record store.
* @return If the cached was loaded.
* @throws IOException if the cache update failed.
*/
public boolean loadCache(final Class<?> clazz) throws IOException {
return loadCache(clazz, false);
}
/**
* Update the cache for a specific record store.
*
* @param clazz Class of the record store.
* @param force Force the update ignoring cached periods.
* @return If the cached was loaded.
* @throws IOException if the cache update failed.
*/
public boolean loadCache(Class<?> clazz, boolean force) throws IOException {
List<StateStoreCache> cachesToUpdate =
new LinkedList<StateStoreCache>();
cachesToUpdate.addAll(this.cachesToUpdateInternal);
cachesToUpdate.addAll(this.cachesToUpdateExternal);
for (StateStoreCache cachedStore : cachesToUpdate) {
if (clazz.isInstance(cachedStore)) {
return cachedStore.loadCache(force);
}
}
throw new IOException("Registered cache was not found for " + clazz);
}
/**
* Get the metrics for the State Store.
*
* @return State Store metrics.
*/
public StateStoreMetrics getMetrics() {
return metrics;
}
}