blob: cdd44490901ac752a574c0dba9a67355dfca8403 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Record store that takes care of caching the records in memory.
*
* @param <R> Record to store by this interface.
*/
public abstract class CachedRecordStore<R extends BaseRecord>
extends RecordStore<R> implements StateStoreCache {
private static final Logger LOG =
LoggerFactory.getLogger(CachedRecordStore.class);
/** Prevent loading the cache more than once every 500 ms. */
private static final long MIN_UPDATE_MS = 500;
/** Cached entries. */
private List<R> records = new ArrayList<>();
/** Time stamp of the cached entries. */
private long timestamp = -1;
/** If the cache is initialized. */
private boolean initialized = false;
/** Last time the cache was updated. */
private long lastUpdate = -1;
/** Lock to access the memory cache. */
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
/** If it should override the expired values when loading the cache. */
private boolean override = false;
/**
* Create a new cached record store.
*
* @param clazz Class of the record to store.
* @param driver State Store driver.
*/
protected CachedRecordStore(Class<R> clazz, StateStoreDriver driver) {
this(clazz, driver, false);
}
/**
* Create a new cached record store.
*
* @param clazz Class of the record to store.
* @param driver State Store driver.
* @param over If the entries should be override if they expire
*/
protected CachedRecordStore(
Class<R> clazz, StateStoreDriver driver, boolean over) {
super(clazz, driver);
this.override = over;
}
/**
* Check that the cache of the State Store information is available.
*
* @throws StateStoreUnavailableException If the cache is not initialized.
*/
private void checkCacheAvailable() throws StateStoreUnavailableException {
if (!this.initialized) {
throw new StateStoreUnavailableException(
"Cached State Store not initialized, " +
getRecordClass().getSimpleName() + " records not valid");
}
}
@Override
public boolean loadCache(boolean force) throws IOException {
// Prevent loading the cache too frequently
if (force || isUpdateTime()) {
List<R> newRecords = null;
long t = -1;
try {
QueryResult<R> result = getDriver().get(getRecordClass());
newRecords = result.getRecords();
t = result.getTimestamp();
// If we have any expired record, update the State Store
if (this.override) {
overrideExpiredRecords(result);
}
} catch (IOException e) {
LOG.error("Cannot get \"{}\" records from the State Store",
getRecordClass().getSimpleName());
this.initialized = false;
return false;
}
// Update cache atomically
writeLock.lock();
try {
this.records.clear();
this.records.addAll(newRecords);
this.timestamp = t;
this.initialized = true;
} finally {
writeLock.unlock();
}
// Update the metrics for the cache State Store size
StateStoreMetrics metrics = getDriver().getMetrics();
if (metrics != null) {
String recordName = getRecordClass().getSimpleName();
metrics.setCacheSize(recordName, this.records.size());
}
lastUpdate = Time.monotonicNow();
}
return true;
}
/**
* Check if it's time to update the cache. Update it it was never updated.
*
* @return If it's time to update this cache.
*/
private boolean isUpdateTime() {
return Time.monotonicNow() - lastUpdate > MIN_UPDATE_MS;
}
/**
* Updates the state store with any record overrides we detected, such as an
* expired state.
*
* @param query RecordQueryResult containing the data to be inspected.
* @throws IOException
*/
public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
List<R> commitRecords = new ArrayList<>();
List<R> newRecords = query.getRecords();
long currentDriverTime = query.getTimestamp();
if (newRecords == null || currentDriverTime <= 0) {
LOG.error("Cannot check overrides for record");
return;
}
for (R record : newRecords) {
if (record.checkExpired(currentDriverTime)) {
String recordName = StateStoreUtils.getRecordName(record.getClass());
LOG.info("Override State Store record {}: {}", recordName, record);
commitRecords.add(record);
}
}
if (commitRecords.size() > 0) {
getDriver().putAll(commitRecords, true, false);
}
}
/**
* Updates the state store with any record overrides we detected, such as an
* expired state.
*
* @param record Record record to be updated.
* @throws IOException
*/
public void overrideExpiredRecord(R record) throws IOException {
List<R> newRecords = Collections.singletonList(record);
long time = getDriver().getTime();
QueryResult<R> query = new QueryResult<>(newRecords, time);
overrideExpiredRecords(query);
}
/**
* Get all the cached records.
*
* @return Copy of the cached records.
* @throws StateStoreUnavailableException If the State store is not available.
*/
public List<R> getCachedRecords() throws StateStoreUnavailableException {
checkCacheAvailable();
List<R> ret = new LinkedList<R>();
this.readLock.lock();
try {
ret.addAll(this.records);
} finally {
this.readLock.unlock();
}
return ret;
}
/**
* Get all the cached records and the time stamp of the cache.
*
* @return Copy of the cached records and the time stamp.
* @throws StateStoreUnavailableException If the State store is not available.
*/
protected QueryResult<R> getCachedRecordsAndTimeStamp()
throws StateStoreUnavailableException {
checkCacheAvailable();
this.readLock.lock();
try {
return new QueryResult<R>(this.records, this.timestamp);
} finally {
this.readLock.unlock();
}
}
}