blob: 8ac6d1ce0b3f92db80504ffb51c57e5fab5fe336 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.store.hdfs;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.codahale.metrics.MetricRegistry;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.metrics.MetricsMap;
import org.apache.solr.metrics.SolrMetricProducer;
import org.apache.solr.metrics.SolrMetricsContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @deprecated since 8.6; see <a href="https://cwiki.apache.org/confluence/display/SOLR/Deprecations">Deprecations</a>
*/
public class HdfsLocalityReporter implements SolrInfoBean, SolrMetricProducer {
public static final String LOCALITY_BYTES_TOTAL = "locality.bytes.total";
public static final String LOCALITY_BYTES_LOCAL = "locality.bytes.local";
public static final String LOCALITY_BYTES_RATIO = "locality.bytes.ratio";
public static final String LOCALITY_BLOCKS_TOTAL = "locality.blocks.total";
public static final String LOCALITY_BLOCKS_LOCAL = "locality.blocks.local";
public static final String LOCALITY_BLOCKS_RATIO = "locality.blocks.ratio";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String hostname;
private final ConcurrentMap<HdfsDirectory,ConcurrentMap<FileStatus,BlockLocation[]>> cache;
private final Set<String> metricNames = ConcurrentHashMap.newKeySet();
private SolrMetricsContext solrMetricsContext;
public HdfsLocalityReporter() {
cache = new ConcurrentHashMap<>();
}
/**
* Set the host name to use when determining locality
* @param hostname The name of this host; should correspond to what HDFS Data Nodes think this is.
*/
public void setHost(String hostname) {
this.hostname = hostname;
}
@Override
public String getName() {
return "hdfs-locality";
}
@Override
public String getDescription() {
return "Provides metrics for HDFS data locality.";
}
@Override
public Category getCategory() {
return Category.OTHER;
}
@Override
public Set<String> getMetricNames() {
return metricNames;
}
@Override
public MetricRegistry getMetricRegistry() {
return solrMetricsContext != null ? solrMetricsContext.getMetricRegistry() : null;
}
@Override
public SolrMetricsContext getSolrMetricsContext() {
return solrMetricsContext;
}
/**
* Provide statistics on HDFS block locality, both in terms of bytes and block counts.
*/
@Override
public void initializeMetrics(SolrMetricsContext parentContext, String scope) {
solrMetricsContext = parentContext.getChildContext(this);
MetricsMap metricsMap = new MetricsMap(map -> {
long totalBytes = 0;
long localBytes = 0;
int totalCount = 0;
int localCount = 0;
for (Iterator<HdfsDirectory> iterator = cache.keySet().iterator(); iterator.hasNext();) {
HdfsDirectory hdfsDirectory = iterator.next();
if (hdfsDirectory.isClosed()) {
iterator.remove();
} else {
try {
refreshDirectory(hdfsDirectory);
Map<FileStatus,BlockLocation[]> blockMap = cache.get(hdfsDirectory);
// For every block in every file in this directory, count it
for (BlockLocation[] locations : blockMap.values()) {
for (BlockLocation bl : locations) {
totalBytes += bl.getLength();
totalCount++;
if (Arrays.asList(bl.getHosts()).contains(hostname)) {
localBytes += bl.getLength();
localCount++;
}
}
}
} catch (IOException e) {
log.warn("Could not retrieve locality information for {} due to exception: {}",
hdfsDirectory.getHdfsDirPath(), e);
}
}
}
map.put(LOCALITY_BYTES_TOTAL, totalBytes);
map.put(LOCALITY_BYTES_LOCAL, localBytes);
if (localBytes == 0) {
map.put(LOCALITY_BYTES_RATIO, 0);
} else {
map.put(LOCALITY_BYTES_RATIO, localBytes / (double) totalBytes);
}
map.put(LOCALITY_BLOCKS_TOTAL, totalCount);
map.put(LOCALITY_BLOCKS_LOCAL, localCount);
if (localCount == 0) {
map.put(LOCALITY_BLOCKS_RATIO, 0);
} else {
map.put(LOCALITY_BLOCKS_RATIO, localCount / (double) totalCount);
}
});
solrMetricsContext.gauge(this, metricsMap, true, "hdfsLocality", getCategory().toString(), scope);
}
/**
* Add a directory for block locality reporting. This directory will continue to be checked until its close method has
* been called.
*
* @param dir
* The directory to keep metrics on.
*/
public void registerDirectory(HdfsDirectory dir) {
if (log.isInfoEnabled()) {
if (log.isInfoEnabled()) {
log.info("Registering direcotry {} for locality metrics.", dir.getHdfsDirPath());
}
}
cache.put(dir, new ConcurrentHashMap<FileStatus, BlockLocation[]>());
}
/**
* Update the cached block locations for the given directory. This includes deleting any files that no longer exist in
* the file system and adding any new files that have shown up.
*
* @param dir
* The directory to refresh
* @throws IOException
* If there is a problem getting info from HDFS
*/
private void refreshDirectory(HdfsDirectory dir) throws IOException {
Map<FileStatus,BlockLocation[]> directoryCache = cache.get(dir);
Set<FileStatus> cachedStatuses = directoryCache.keySet();
FileSystem fs = dir.getFileSystem();
FileStatus[] statuses = fs.listStatus(dir.getHdfsDirPath());
List<FileStatus> statusList = Arrays.asList(statuses);
log.debug("Updating locality information for: {}", statusList);
// Keep only the files that still exist
cachedStatuses.retainAll(statusList);
// Fill in missing entries in the cache
for (FileStatus status : statusList) {
if (!status.isDirectory() && !directoryCache.containsKey(status)) {
BlockLocation[] locations = fs.getFileBlockLocations(status, 0, status.getLen());
directoryCache.put(status, locations);
}
}
}
}