blob: a686bae7a157c250aeb65cc77a869efcbec399bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.server.util;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.file.rfile.RFileOperations;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.MultiIterator;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
public class FileUtil {
public static class FileInfo {
Key firstKey = new Key();
Key lastKey = new Key();
public FileInfo(Key firstKey, Key lastKey) {
this.firstKey = firstKey;
this.lastKey = lastKey;
}
public Text getFirstRow() {
return firstKey.getRow();
}
public Text getLastRow() {
return lastKey.getRow();
}
}
private static final Logger log = LoggerFactory.getLogger(FileUtil.class);
private static Path createTmpDir(AccumuloConfiguration acuConf, VolumeManager fs) throws IOException {
String accumuloDir = fs.choose(Optional.<String> absent(), ServerConstants.getBaseUris());
Path result = null;
while (result == null) {
result = new Path(accumuloDir + Path.SEPARATOR + "tmp/idxReduce_" + String.format("%09d", new Random().nextInt(Integer.MAX_VALUE)));
try {
fs.getFileStatus(result);
result = null;
continue;
} catch (FileNotFoundException fne) {
// found an unused temp directory
}
fs.mkdirs(result);
// try to reserve the tmp dir
// In some versions of hadoop, two clients concurrently trying to create the same directory might both return true
// Creating a file is not subject to this, so create a special file to make sure we solely will use this directory
if (!fs.createNewFile(new Path(result, "__reserve")))
result = null;
}
return result;
}
public static Collection<String> reduceFiles(AccumuloConfiguration acuConf, Configuration conf, VolumeManager fs, Text prevEndRow, Text endRow,
Collection<String> mapFiles, int maxFiles, Path tmpDir, int pass) throws IOException {
ArrayList<String> paths = new ArrayList<>(mapFiles);
if (paths.size() <= maxFiles)
return paths;
String newDir = String.format("%s/pass_%04d", tmpDir, pass);
int start = 0;
ArrayList<String> outFiles = new ArrayList<>();
int count = 0;
while (start < paths.size()) {
int end = Math.min(maxFiles + start, paths.size());
List<String> inFiles = paths.subList(start, end);
start = end;
String newMapFile = String.format("%s/%04d.%s", newDir, count++, RFile.EXTENSION);
outFiles.add(newMapFile);
FileSystem ns = fs.getVolumeByPath(new Path(newMapFile)).getFileSystem();
FileSKVWriter writer = new RFileOperations().newWriterBuilder().forFile(newMapFile.toString(), ns, ns.getConf()).withTableConfiguration(acuConf).build();
writer.startDefaultLocalityGroup();
List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<>(inFiles.size());
FileSKVIterator reader = null;
try {
for (String s : inFiles) {
ns = fs.getVolumeByPath(new Path(s)).getFileSystem();
reader = FileOperations.getInstance().newIndexReaderBuilder().forFile(s, ns, ns.getConf()).withTableConfiguration(acuConf).build();
iters.add(reader);
}
MultiIterator mmfi = new MultiIterator(iters, true);
while (mmfi.hasTop()) {
Key key = mmfi.getTopKey();
boolean gtPrevEndRow = prevEndRow == null || key.compareRow(prevEndRow) > 0;
boolean lteEndRow = endRow == null || key.compareRow(endRow) <= 0;
if (gtPrevEndRow && lteEndRow)
writer.append(key, new Value(new byte[0]));
if (!lteEndRow)
break;
mmfi.next();
}
} finally {
try {
if (reader != null)
reader.close();
} catch (IOException e) {
log.error("{}", e.getMessage(), e);
}
for (SortedKeyValueIterator<Key,Value> r : iters)
try {
if (r != null)
((FileSKVIterator) r).close();
} catch (IOException e) {
// continue closing
log.error("{}", e.getMessage(), e);
}
try {
writer.close();
} catch (IOException e) {
log.error("{}", e.getMessage(), e);
throw e;
}
}
}
return reduceFiles(acuConf, conf, fs, prevEndRow, endRow, outFiles, maxFiles, tmpDir, pass + 1);
}
public static SortedMap<Double,Key> findMidPoint(VolumeManager fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<String> mapFiles,
double minSplit) throws IOException {
return findMidPoint(fs, acuConf, prevEndRow, endRow, mapFiles, minSplit, true);
}
public static double estimatePercentageLTE(VolumeManager fs, AccumuloConfiguration acuconf, Text prevEndRow, Text endRow, Collection<String> mapFiles,
Text splitRow) throws IOException {
Configuration conf = CachedConfiguration.getInstance();
Path tmpDir = null;
int maxToOpen = acuconf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN);
ArrayList<FileSKVIterator> readers = new ArrayList<>(mapFiles.size());
try {
if (mapFiles.size() > maxToOpen) {
tmpDir = createTmpDir(acuconf, fs);
log.debug("Too many indexes (" + mapFiles.size() + ") to open at once for " + endRow + " " + prevEndRow + ", reducing in tmpDir = " + tmpDir);
long t1 = System.currentTimeMillis();
mapFiles = reduceFiles(acuconf, conf, fs, prevEndRow, endRow, mapFiles, maxToOpen, tmpDir, 0);
long t2 = System.currentTimeMillis();
log.debug("Finished reducing indexes for " + endRow + " " + prevEndRow + " in " + String.format("%6.2f secs", (t2 - t1) / 1000.0));
}
if (prevEndRow == null)
prevEndRow = new Text();
long numKeys = 0;
numKeys = countIndexEntries(acuconf, prevEndRow, endRow, mapFiles, true, conf, fs, readers);
if (numKeys == 0) {
// not enough info in the index to answer the question, so instead of going to
// the data just punt and return .5
return .5;
}
List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(readers);
MultiIterator mmfi = new MultiIterator(iters, true);
// skip the prevendrow
while (mmfi.hasTop() && mmfi.getTopKey().compareRow(prevEndRow) <= 0) {
mmfi.next();
}
int numLte = 0;
while (mmfi.hasTop() && mmfi.getTopKey().compareRow(splitRow) <= 0) {
numLte++;
mmfi.next();
}
if (numLte > numKeys) {
// something went wrong
throw new RuntimeException("numLte > numKeys " + numLte + " " + numKeys + " " + prevEndRow + " " + endRow + " " + splitRow + " " + mapFiles);
}
// do not want to return 0% or 100%, so add 1 and 2 below
return (numLte + 1) / (double) (numKeys + 2);
} finally {
cleanupIndexOp(acuconf, tmpDir, fs, readers);
}
}
/**
*
* @param mapFiles
* - list MapFiles to find the mid point key
*
* ISSUES : This method used the index files to find the mid point. If the map files have different index intervals this method will not return an
* accurate mid point. Also, it would be tricky to use this method in conjunction with an in memory map because the indexing interval is unknown.
*/
public static SortedMap<Double,Key> findMidPoint(VolumeManager fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<String> mapFiles,
double minSplit, boolean useIndex) throws IOException {
Configuration conf = CachedConfiguration.getInstance();
Collection<String> origMapFiles = mapFiles;
Path tmpDir = null;
int maxToOpen = acuConf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN);
ArrayList<FileSKVIterator> readers = new ArrayList<>(mapFiles.size());
try {
if (mapFiles.size() > maxToOpen) {
if (!useIndex)
throw new IOException("Cannot find mid point using data files, too many " + mapFiles.size());
tmpDir = createTmpDir(acuConf, fs);
log.debug("Too many indexes (" + mapFiles.size() + ") to open at once for " + endRow + " " + prevEndRow + ", reducing in tmpDir = " + tmpDir);
long t1 = System.currentTimeMillis();
mapFiles = reduceFiles(acuConf, conf, fs, prevEndRow, endRow, mapFiles, maxToOpen, tmpDir, 0);
long t2 = System.currentTimeMillis();
log.debug("Finished reducing indexes for " + endRow + " " + prevEndRow + " in " + String.format("%6.2f secs", (t2 - t1) / 1000.0));
}
if (prevEndRow == null)
prevEndRow = new Text();
long t1 = System.currentTimeMillis();
long numKeys = 0;
numKeys = countIndexEntries(acuConf, prevEndRow, endRow, mapFiles, tmpDir == null ? useIndex : false, conf, fs, readers);
if (numKeys == 0) {
if (useIndex) {
log.warn("Failed to find mid point using indexes, falling back to data files which is slower. No entries between " + prevEndRow + " and " + endRow
+ " for " + mapFiles);
// need to pass original map files, not possibly reduced indexes
return findMidPoint(fs, acuConf, prevEndRow, endRow, origMapFiles, minSplit, false);
}
throw new IOException("Failed to find mid point, no entries between " + prevEndRow + " and " + endRow + " for " + mapFiles);
}
List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(readers);
MultiIterator mmfi = new MultiIterator(iters, true);
// skip the prevendrow
while (mmfi.hasTop() && mmfi.getTopKey().compareRow(prevEndRow) <= 0)
mmfi.next();
// read half of the keys in the index
TreeMap<Double,Key> ret = new TreeMap<>();
Key lastKey = null;
long keysRead = 0;
Key keyBeforeMidPoint = null;
long keyBeforeMidPointPosition = 0;
while (keysRead < numKeys / 2) {
if (lastKey != null && !lastKey.equals(mmfi.getTopKey(), PartialKey.ROW) && (keysRead - 1) / (double) numKeys >= minSplit) {
keyBeforeMidPoint = new Key(lastKey);
keyBeforeMidPointPosition = keysRead - 1;
}
if (lastKey == null)
lastKey = new Key();
lastKey.set(mmfi.getTopKey());
keysRead++;
// consume minimum
mmfi.next();
}
if (keyBeforeMidPoint != null)
ret.put(keyBeforeMidPointPosition / (double) numKeys, keyBeforeMidPoint);
long t2 = System.currentTimeMillis();
log.debug(String.format("Found midPoint from indexes in %6.2f secs.%n", ((t2 - t1) / 1000.0)));
ret.put(.5, mmfi.getTopKey());
// sanity check
for (Key key : ret.values()) {
boolean inRange = (key.compareRow(prevEndRow) > 0 && (endRow == null || key.compareRow(endRow) < 1));
if (!inRange) {
throw new IOException("Found mid point is not in range " + key + " " + prevEndRow + " " + endRow + " " + mapFiles);
}
}
return ret;
} finally {
cleanupIndexOp(acuConf, tmpDir, fs, readers);
}
}
protected static void cleanupIndexOp(AccumuloConfiguration acuConf, Path tmpDir, VolumeManager fs, ArrayList<FileSKVIterator> readers) throws IOException {
// close all of the index sequence files
for (FileSKVIterator r : readers) {
try {
if (r != null)
r.close();
} catch (IOException e) {
// okay, try to close the rest anyway
log.error("{}", e.getMessage(), e);
}
}
if (tmpDir != null) {
Volume v = fs.getVolumeByPath(tmpDir);
if (v.getFileSystem().exists(tmpDir)) {
fs.deleteRecursively(tmpDir);
return;
}
log.error("Did not delete tmp dir because it wasn't a tmp dir " + tmpDir);
}
}
private static long countIndexEntries(AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<String> mapFiles, boolean useIndex,
Configuration conf, VolumeManager fs, ArrayList<FileSKVIterator> readers) throws IOException {
long numKeys = 0;
// count the total number of index entries
for (String ref : mapFiles) {
FileSKVIterator reader = null;
Path path = new Path(ref);
FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
try {
if (useIndex)
reader = FileOperations.getInstance().newIndexReaderBuilder().forFile(path.toString(), ns, ns.getConf()).withTableConfiguration(acuConf).build();
else
reader = FileOperations.getInstance().newScanReaderBuilder().forFile(path.toString(), ns, ns.getConf()).withTableConfiguration(acuConf)
.overRange(new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false).build();
while (reader.hasTop()) {
Key key = reader.getTopKey();
if (endRow != null && key.compareRow(endRow) > 0)
break;
else if (prevEndRow == null || key.compareRow(prevEndRow) > 0)
numKeys++;
reader.next();
}
} finally {
try {
if (reader != null)
reader.close();
} catch (IOException e) {
log.error("{}", e.getMessage(), e);
}
}
if (useIndex)
readers.add(FileOperations.getInstance().newIndexReaderBuilder().forFile(path.toString(), ns, ns.getConf()).withTableConfiguration(acuConf).build());
else
readers.add(FileOperations.getInstance().newScanReaderBuilder().forFile(path.toString(), ns, ns.getConf()).withTableConfiguration(acuConf)
.overRange(new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false).build());
}
return numKeys;
}
public static Map<FileRef,FileInfo> tryToGetFirstAndLastRows(VolumeManager fs, AccumuloConfiguration acuConf, Set<FileRef> mapfiles) {
HashMap<FileRef,FileInfo> mapFilesInfo = new HashMap<>();
long t1 = System.currentTimeMillis();
for (FileRef mapfile : mapfiles) {
FileSKVIterator reader = null;
FileSystem ns = fs.getVolumeByPath(mapfile.path()).getFileSystem();
try {
reader = FileOperations.getInstance().newReaderBuilder().forFile(mapfile.toString(), ns, ns.getConf()).withTableConfiguration(acuConf).build();
Key firstKey = reader.getFirstKey();
if (firstKey != null) {
mapFilesInfo.put(mapfile, new FileInfo(firstKey, reader.getLastKey()));
}
} catch (IOException ioe) {
log.warn("Failed to read map file to determine first and last key : " + mapfile, ioe);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ioe) {
log.warn("failed to close " + mapfile, ioe);
}
}
}
}
long t2 = System.currentTimeMillis();
log.debug(String.format("Found first and last keys for %d map files in %6.2f secs", mapfiles.size(), (t2 - t1) / 1000.0));
return mapFilesInfo;
}
public static WritableComparable<Key> findLastKey(VolumeManager fs, AccumuloConfiguration acuConf, Collection<FileRef> mapFiles) throws IOException {
Key lastKey = null;
for (FileRef ref : mapFiles) {
Path path = ref.path();
FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder().forFile(path.toString(), ns, ns.getConf()).withTableConfiguration(acuConf)
.seekToBeginning().build();
try {
if (!reader.hasTop())
// file is empty, so there is no last key
continue;
Key key = reader.getLastKey();
if (lastKey == null || key.compareTo(lastKey) > 0)
lastKey = key;
} finally {
try {
if (reader != null)
reader.close();
} catch (IOException e) {
log.error("{}", e.getMessage(), e);
}
}
}
return lastKey;
}
private static class MLong {
public MLong(long i) {
l = i;
}
long l;
}
public static Map<KeyExtent,Long> estimateSizes(AccumuloConfiguration acuConf, Path mapFile, long fileSize, List<KeyExtent> extents, Configuration conf,
VolumeManager fs) throws IOException {
long totalIndexEntries = 0;
Map<KeyExtent,MLong> counts = new TreeMap<>();
for (KeyExtent keyExtent : extents)
counts.put(keyExtent, new MLong(0));
Text row = new Text();
FileSystem ns = fs.getVolumeByPath(mapFile).getFileSystem();
FileSKVIterator index = FileOperations.getInstance().newIndexReaderBuilder().forFile(mapFile.toString(), ns, ns.getConf()).withTableConfiguration(acuConf)
.build();
try {
while (index.hasTop()) {
Key key = index.getTopKey();
totalIndexEntries++;
key.getRow(row);
for (Entry<KeyExtent,MLong> entry : counts.entrySet())
if (entry.getKey().contains(row))
entry.getValue().l++;
index.next();
}
} finally {
try {
if (index != null)
index.close();
} catch (IOException e) {
// continue with next file
log.error("{}", e.getMessage(), e);
}
}
Map<KeyExtent,Long> results = new TreeMap<>();
for (KeyExtent keyExtent : extents) {
double numEntries = counts.get(keyExtent).l;
if (numEntries == 0)
numEntries = 1;
long estSize = (long) ((numEntries / totalIndexEntries) * fileSize);
results.put(keyExtent, estSize);
}
return results;
}
public static Collection<String> toPathStrings(Collection<FileRef> refs) {
ArrayList<String> ret = new ArrayList<>();
for (FileRef fileRef : refs) {
ret.add(fileRef.path().toString());
}
return ret;
}
}