blob: b3f8bd7c76143fe596824eb0ba5528244fc67121 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.log4j.Logger;
import org.apache.impala.analysis.TableName;
import org.apache.impala.catalog.FeFsPartition;
import org.apache.impala.catalog.HdfsPartition;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.thrift.JniCatalogConstants;
import org.apache.impala.thrift.THdfsCachingOp;
import com.google.common.base.Preconditions;
/**
* Utility class for submitting and dropping HDFS cache requests.
*/
public class HdfsCachingUtil {
private static final Logger LOG = Logger.getLogger(HdfsCachingUtil.class);
// The key name used to save cache directive IDs in table/partition properties.
public final static String CACHE_DIR_ID_PROP_NAME = "cache_directive_id";
// The key name used to store the replication factor for cached files
public final static String CACHE_DIR_REPLICATION_PROP_NAME = "cache_replication";
// The number of caching refresh intervals that can go by when waiting for data to
// become cached before assuming no more progress is being made.
private final static int MAX_UNCHANGED_CACHING_REFRESH_INTERVALS = 5;
private static DistributedFileSystem dfs = null;
/**
* Returns the dfs singleton object.
*/
private static DistributedFileSystem getDfs() throws ImpalaRuntimeException {
if (dfs == null) {
try {
dfs = FileSystemUtil.getDistributedFileSystem();
} catch (IOException e) {
throw new ImpalaRuntimeException("HdfsCachingUtil failed to initialize the " +
"DistributedFileSystem: ", e);
}
}
return dfs;
}
/**
* Caches the location of the given Hive Metastore Table and updates the
* table's properties with the submitted cache directive ID. The caller is
* responsible for not caching the same table twice, as HDFS will create a second
* cache directive even if it is similar to an already existing one.
*
* Returns the ID of the submitted cache directive and throws if there is an error
* submitting.
*/
public static long submitCacheTblDirective(
org.apache.hadoop.hive.metastore.api.Table table,
String poolName, short replication) throws ImpalaRuntimeException {
long id = HdfsCachingUtil.submitDirective(new Path(table.getSd().getLocation()),
poolName, replication);
table.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
table.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
return id;
}
/**
* Caches the location of the given partition and updates the
* partitions's properties with the submitted cache directive ID. The caller is
* responsible for not caching the same partition twice, as HDFS will create a second
* cache directive even if it is similar to an already existing one.
*
* Returns the ID of the submitted cache directive and throws if there is an error
* submitting the directive.
*/
public static long submitCachePartitionDirective(HdfsPartition part,
String poolName, short replication) throws ImpalaRuntimeException {
long id = HdfsCachingUtil.submitDirective(new Path(part.getLocation()),
poolName, replication);
part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
return id;
}
/**
* Convenience method for working directly on a metastore partition. See
* submitCachePartitionDirective(HdfsPartition, String, short) for more details.
*/
public static long submitCachePartitionDirective(
org.apache.hadoop.hive.metastore.api.Partition part,
String poolName, short replication) throws ImpalaRuntimeException {
long id = HdfsCachingUtil.submitDirective(new Path(part.getSd().getLocation()),
poolName, replication);
part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
return id;
}
/**
* Removes the cache directive associated with the table from HDFS, uncaching all
* data. Also updates the table's metadata. No-op if the table is not cached.
*/
public static void removeTblCacheDirective(
org.apache.hadoop.hive.metastore.api.Table table) throws ImpalaRuntimeException {
Preconditions.checkNotNull(table);
if (LOG.isTraceEnabled()) {
LOG.trace("Uncaching table: " + table.getDbName() + "." + table.getTableName());
}
Map<String, String> parameters = table.getParameters();
if (parameters == null) {
LOG.warn("removePartitionCacheDirective(): table " + table.getTableName() +
"has a null parameter map.");
}
Long id = getCacheDirectiveId(parameters);
if (id == null) {
LOG.warn("removePartitionCacheDirective(): table " + table.getTableName() +
"doesn't have a cache directive id.");
return;
}
HdfsCachingUtil.removeDirective(id);
table.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
table.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
}
/**
* Removes the cache directive associated with the partition from HDFS, uncaching all
* data. Also updates the partition's metadata to remove the cache directive ID.
* No-op if the table is not cached.
*/
public static void removePartitionCacheDirective(FeFsPartition part)
throws ImpalaException {
Preconditions.checkNotNull(part);
Map<String, String> parameters = part.getParameters();
if (parameters == null) {
LOG.warn("removePartitionCacheDirective(): partition " + part.getPartitionName() +
"has a null parameter map.");
}
Long id = getCacheDirectiveId(parameters);
if (id == null) {
LOG.warn("removePartitionCacheDirective(): partition " + part.getPartitionName() +
"doesn't have a cache directive id.");
return;
}
HdfsCachingUtil.removeDirective(id);
part.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
part.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
}
/**
* Convenience method for working directly on a metastore partition. See
* removePartitionCacheDirective(HdfsPartition) for more details.
*/
public static void removePartitionCacheDirective(
org.apache.hadoop.hive.metastore.api.Partition part) throws ImpalaException {
Preconditions.checkNotNull(part);
Long id = getCacheDirectiveId(part.getParameters());
if (id == null) return;
HdfsCachingUtil.removeDirective(id);
part.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
part.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
}
/**
* Returns the cache directive ID from the given table/partition parameter
* map. Returns null if the CACHE_DIR_ID_PROP_NAME key was not set or if
* there was an error parsing the associated ID.
*/
public static Long getCacheDirectiveId(Map<String, String> params) {
if (params == null) return null;
String idStr = params.get(CACHE_DIR_ID_PROP_NAME);
if (idStr == null) return null;
try {
return Long.parseLong(idStr);
} catch (NumberFormatException e) {
return null;
}
}
/**
* Given a cache directive ID, returns the pool the directive is cached in.
* Returns null if no outstanding cache directive match this ID.
*/
public static String getCachePool(long directiveId)
throws ImpalaRuntimeException {
CacheDirectiveEntry entry = getDirective(directiveId);
return entry == null ? null : entry.getInfo().getPool();
}
/**
* Given a cache directive ID, returns the replication factor for the directive.
* Returns null if no outstanding cache directives match this ID.
*/
public static Short getCacheReplication(long directiveId)
throws ImpalaRuntimeException {
CacheDirectiveEntry entry = getDirective(directiveId);
return entry != null ? entry.getInfo().getReplication() : null;
}
/**
* Returns the cache replication value from the parameters map. We assume that only
* cached table parameters are used and the property is always present.
*/
public static Short getCachedCacheReplication(Map<String, String> params) {
Preconditions.checkNotNull(params);
String replication = params.get(CACHE_DIR_REPLICATION_PROP_NAME);
if (replication == null) {
return JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
}
try {
return Short.parseShort(replication);
} catch (NumberFormatException e) {
return JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
}
}
/**
* Waits on a cache directive to either complete or stop making progress. Progress is
* checked by polling the HDFS caching stats every
* DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS. We verify the request's
* "currentBytesCached" is increasing compared to "bytesNeeded".
* If "currentBytesCached" == "bytesNeeded" or if no progress is made for a
* MAX_UNCHANGED_CACHING_REFRESH_INTERVALS, this function returns.
*/
public static void waitForDirective(long directiveId)
throws ImpalaRuntimeException {
long bytesNeeded = 0L;
long currentBytesCached = 0L;
CacheDirectiveEntry cacheDir = getDirective(directiveId);
if (cacheDir == null) return;
bytesNeeded = cacheDir.getStats().getBytesNeeded();
currentBytesCached = cacheDir.getStats().getBytesCached();
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Waiting on cache directive id: %d. Bytes " +
"cached (%d) / needed (%d)", directiveId, currentBytesCached, bytesNeeded));
}
// All the bytes are cached, just return.
if (bytesNeeded == currentBytesCached) return;
// The refresh interval is how often HDFS will update cache directive stats. We use
// this value to determine how frequently we should poll for changes.
long hdfsRefreshIntervalMs = getDfs().getConf().getLong(
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
Preconditions.checkState(hdfsRefreshIntervalMs > 0);
// Loop until either MAX_UNCHANGED_CACHING_REFRESH_INTERVALS have passed with no
// changes or all required data is cached.
int unchangedCounter = 0;
while (unchangedCounter < MAX_UNCHANGED_CACHING_REFRESH_INTERVALS) {
long previousBytesCached = currentBytesCached;
cacheDir = getDirective(directiveId);
if (cacheDir == null) return;
currentBytesCached = cacheDir.getStats().getBytesCached();
bytesNeeded = cacheDir.getStats().getBytesNeeded();
if (currentBytesCached == bytesNeeded) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Cache directive id: %d has completed." +
"Bytes cached (%d) / needed (%d)", directiveId, currentBytesCached,
bytesNeeded));
}
return;
}
if (currentBytesCached == previousBytesCached) {
++unchangedCounter;
} else {
unchangedCounter = 0;
}
try {
// Sleep for the refresh interval + a little bit more to ensure a full interval
// has completed. A value of 25% the refresh interval was arbitrarily chosen.
Thread.sleep((long) (hdfsRefreshIntervalMs * 1.25));
} catch (InterruptedException e) { /* ignore */ }
}
LOG.warn(String.format("No changes in cached bytes in: %d(ms). All data may not " +
"be cached. Final stats for cache directive id: %d. Bytes cached (%d)/needed " +
"(%d)", hdfsRefreshIntervalMs * MAX_UNCHANGED_CACHING_REFRESH_INTERVALS,
directiveId, currentBytesCached, bytesNeeded));
}
/**
* Submits a new caching directive for the specified cache pool name, path and
* replication. Returns the directive ID if the submission was successful or an
* ImpalaRuntimeException if the submission fails.
*/
private static long submitDirective(Path path, String poolName, short replication)
throws ImpalaRuntimeException {
Preconditions.checkNotNull(path);
Preconditions.checkState(poolName != null && !poolName.isEmpty());
CacheDirectiveInfo info = new CacheDirectiveInfo.Builder()
.setPool(poolName)
.setReplication(replication)
.setPath(path).build();
if (LOG.isTraceEnabled()) {
LOG.trace("Submitting cache directive: " + info.toString());
}
try {
return getDfs().addCacheDirective(info);
} catch (IOException e) {
throw new ImpalaRuntimeException(e.getMessage(), e);
}
}
/**
* Update cache directive for a table and updates the metastore parameters.
* Returns the cache directive ID
*/
public static long modifyCacheDirective(Long id,
org.apache.hadoop.hive.metastore.api.Table table,
String poolName, short replication) throws ImpalaRuntimeException {
Preconditions.checkNotNull(id);
HdfsCachingUtil.modifyCacheDirective(id, new Path(table.getSd().getLocation()),
poolName, replication);
table.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
table.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
return id;
}
/**
* Update cache directive for a partition and update the metastore parameters.
* Returns the cache directive ID
*/
public static long modifyCacheDirective(Long id, HdfsPartition part, String poolName,
short replication) throws ImpalaRuntimeException {
Preconditions.checkNotNull(id);
HdfsCachingUtil.modifyCacheDirective(id, new Path(part.getLocation()),
poolName, replication);
part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
return id;
}
/**
* Update an existing cache directive to avoid having the same entry multiple
* times
*/
private static void modifyCacheDirective(Long id, Path path, String poolName,
short replication) throws ImpalaRuntimeException {
Preconditions.checkNotNull(path);
Preconditions.checkNotNull(id);
Preconditions.checkState(poolName != null && !poolName.isEmpty());
CacheDirectiveInfo info = new CacheDirectiveInfo.Builder()
.setId(id)
.setPool(poolName)
.setReplication(replication)
.setPath(path).build();
if (LOG.isTraceEnabled()) {
LOG.trace("Modifying cache directive: " + info.toString());
}
try {
getDfs().modifyCacheDirective(info);
} catch (IOException e) {
throw new ImpalaRuntimeException(e.getMessage(), e);
}
}
/**
* Removes the given cache directive if it exists, uncaching the data. If the
* cache request does not exist in HDFS no error is returned.
* Throws an ImpalaRuntimeException if there was any problem removing the
* directive.
*/
private static void removeDirective(long directiveId) throws ImpalaRuntimeException {
if (LOG.isTraceEnabled()) LOG.trace("Removing cache directive id: " + directiveId);
try {
getDfs().removeCacheDirective(directiveId);
} catch (IOException e) {
// There is no special exception type for the case where a directive ID does not
// exist so we must inspect the error message.
if (e.getMessage().contains("No directive with ID")) return;
throw new ImpalaRuntimeException(e.getMessage(), e);
}
}
/**
* Gets the cache directive matching the given ID. Returns null if no matching
* directives were found.
*/
private static CacheDirectiveEntry getDirective(long directiveId)
throws ImpalaRuntimeException {
if (LOG.isTraceEnabled()) {
LOG.trace("Getting cache directive id: " + directiveId);
}
CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder()
.setId(directiveId)
.build();
try {
RemoteIterator<CacheDirectiveEntry> itr = getDfs().listCacheDirectives(filter);
if (itr.hasNext()) return itr.next();
} catch (IOException e) {
// Handle connection issues with e.g. HDFS and possible not found errors
throw new ImpalaRuntimeException(e.getMessage(), e);
}
throw new ImpalaRuntimeException(
"HDFS cache directive filter returned empty result. This must not happen");
}
/**
* Helper method for frequent lookup of replication factor in the thrift caching
* structure.
*/
public static short getReplicationOrDefault(THdfsCachingOp op) {
return op.isSetReplication() ? op.getReplication() :
JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
}
/**
* Returns a boolean indicating if the given thrift caching operation would perform an
* update on an already existing cache directive.
*/
public static boolean isUpdateOp(THdfsCachingOp op, Map<String, String> params)
throws ImpalaRuntimeException {
Long directiveId = Long.parseLong(params.get(CACHE_DIR_ID_PROP_NAME));
CacheDirectiveEntry entry = getDirective(directiveId);
Preconditions.checkNotNull(entry);
// Verify cache pool
if (!op.getCache_pool_name().equals(entry.getInfo().getPool())) {
return false;
}
// Check cache replication factor
if ((op.isSetReplication() && op.getReplication() !=
entry.getInfo().getReplication()) || ( !op.isSetReplication() &&
entry.getInfo().getReplication() !=
JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR)) {
return true;
}
return false;
}
/**
* Validates the properties of the chosen cache pool. Throws on error.
*/
public static void validateCachePool(THdfsCachingOp op, Long directiveId,
TableName table, HdfsPartition partition)
throws ImpalaRuntimeException {
CacheDirectiveEntry entry = getDirective(directiveId);
Preconditions.checkNotNull(entry);
if (!op.getCache_pool_name().equals(entry.getInfo().getPool())) {
throw new ImpalaRuntimeException(String.format("Cannot cache partition in " +
"pool '%s' because it is already cached in '%s'. To change the cache " +
"pool for this partition, first uncache using: ALTER TABLE %s.%s " +
"%sSET UNCACHED", op.getCache_pool_name(),
entry.getInfo().getPool(), table.getDb(), table,
// Insert partition string if partition non null
partition != null ? String.format(" PARTITION(%s) ",
partition.getPartitionName().replaceAll("/", ", ")) : ""));
}
}
/**
* Validates the properties of the chosen cache pool. Throws on error.
*/
public static void validateCachePool(THdfsCachingOp op, Long directiveId,
TableName table) throws ImpalaRuntimeException {
validateCachePool(op, directiveId, table, null);
}
/**
* Validates and returns true if a parameter map contains a cache directive ID and
* validates it against the NameNode to make sure it exists. If the cache
* directive ID does not exist, we remove the value from the parameter map,
* issue a log message and return false. As the value is not written back to the
* Hive MS from this method, the result will be only valid until the next metadata
* fetch. Lastly, we update the cache replication factor in the parameters with the
* value read from HDFS.
*/
public static boolean validateCacheParams(Map<String, String> params) {
Long directiveId = getCacheDirectiveId(params);
if (directiveId == null) return false;
CacheDirectiveEntry entry = null;
try {
entry = getDirective(directiveId);
} catch (ImpalaRuntimeException e) {
if (e.getCause() != null && e.getCause() instanceof RemoteException) {
// This exception signals that the cache directive no longer exists.
LOG.error("Cache directive does not exist", e);
params.remove(CACHE_DIR_ID_PROP_NAME);
params.remove(CACHE_DIR_REPLICATION_PROP_NAME);
} else {
// This exception signals that there was a connection problem with HDFS.
LOG.error("IO Exception, possible connectivity issues with HDFS", e);
}
return false;
}
Preconditions.checkNotNull(entry);
// On the upgrade path the property might not exist, if it exists
// and is different from the one from the meta store, issue a warning.
String replicationFactor = params.get(CACHE_DIR_REPLICATION_PROP_NAME);
if (replicationFactor != null &&
Short.parseShort(replicationFactor) != entry.getInfo().getReplication()) {
LOG.info("Replication factor for entry in HDFS differs from value in Hive MS: " +
entry.getInfo().getPath().toString() + " " +
entry.getInfo().getReplication().toString() + " != " +
params.get(CACHE_DIR_REPLICATION_PROP_NAME));
}
params.put(CACHE_DIR_REPLICATION_PROP_NAME,
String.valueOf(entry.getInfo().getReplication()));
return true;
}
}