blob: 51561935e4e865e1152bea33f49811ab25cb08f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.client;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ServerClient;
import org.apache.accumulo.core.clientImpl.TabletLocator;
import org.apache.accumulo.core.clientImpl.TabletLocator.TabletLocation;
import org.apache.accumulo.core.clientImpl.Translator;
import org.apache.accumulo.core.clientImpl.Translators;
import org.apache.accumulo.core.clientImpl.thrift.ClientService;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.util.StopWatch;
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.util.FileUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.htrace.wrappers.TraceRunnable;
import org.apache.thrift.TServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BulkImporter {
private static final Logger log = LoggerFactory.getLogger(BulkImporter.class);
public static List<String> bulkLoad(ServerContext context, long tid, String tableId,
List<String> files, boolean setTime) throws IOException {
AssignmentStats stats = new BulkImporter(context, tid, tableId, setTime).importFiles(files);
List<String> result = new ArrayList<>();
for (Path p : stats.completeFailures.keySet()) {
result.add(p.toString());
}
return result;
}
private StopWatch<Timers> timer;
private enum Timers {
EXAMINE_MAP_FILES, QUERY_METADATA, IMPORT_MAP_FILES, SLEEP, TOTAL
}
private final ServerContext context;
private String tableId;
private long tid;
private boolean setTime;
public BulkImporter(ServerContext context, long tid, String tableId, boolean setTime) {
this.context = context;
this.tid = tid;
this.tableId = tableId;
this.setTime = setTime;
}
public AssignmentStats importFiles(List<String> files) {
int numThreads = context.getConfiguration().getCount(Property.TSERV_BULK_PROCESS_THREADS);
int numAssignThreads =
context.getConfiguration().getCount(Property.TSERV_BULK_ASSIGNMENT_THREADS);
timer = new StopWatch<>(Timers.class);
timer.start(Timers.TOTAL);
final VolumeManager fs = context.getVolumeManager();
Set<Path> paths = new HashSet<>();
for (String file : files) {
paths.add(new Path(file));
}
AssignmentStats assignmentStats = new AssignmentStats(paths.size());
final Map<Path,List<KeyExtent>> completeFailures =
Collections.synchronizedSortedMap(new TreeMap<>());
ClientService.Client client = null;
final TabletLocator locator = TabletLocator.getLocator(context, TableId.of(tableId));
try {
final Map<Path,List<TabletLocation>> assignments =
Collections.synchronizedSortedMap(new TreeMap<>());
timer.start(Timers.EXAMINE_MAP_FILES);
ExecutorService threadPool =
Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping"));
for (Path path : paths) {
final Path mapFile = path;
Runnable getAssignments = new Runnable() {
@Override
public void run() {
List<TabletLocation> tabletsToAssignMapFileTo = Collections.emptyList();
try {
tabletsToAssignMapFileTo = findOverlappingTablets(context, fs, locator, mapFile);
} catch (Exception ex) {
log.warn("Unable to find tablets that overlap file " + mapFile, ex);
}
log.debug("Map file {} found to overlap {} tablets", mapFile,
tabletsToAssignMapFileTo.size());
if (tabletsToAssignMapFileTo.isEmpty()) {
List<KeyExtent> empty = Collections.emptyList();
completeFailures.put(mapFile, empty);
} else
assignments.put(mapFile, tabletsToAssignMapFileTo);
}
};
threadPool.submit(new TraceRunnable(new LoggingRunnable(log, getAssignments)));
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
try {
threadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
timer.stop(Timers.EXAMINE_MAP_FILES);
assignmentStats.attemptingAssignments(assignments);
Map<Path,List<KeyExtent>> assignmentFailures =
assignMapFiles(fs, assignments, paths, numAssignThreads, numThreads);
assignmentStats.assignmentsFailed(assignmentFailures);
Map<Path,Integer> failureCount = new TreeMap<>();
for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet())
failureCount.put(entry.getKey(), 1);
long sleepTime = 2 * 1000;
while (!assignmentFailures.isEmpty()) {
sleepTime = Math.min(sleepTime * 2, 60 * 1000);
locator.invalidateCache();
// assumption about assignment failures is that it caused by a split
// happening or a missing location
//
// for splits we need to find children key extents that cover the
// same key range and are contiguous (no holes, no overlap)
timer.start(Timers.SLEEP);
sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
timer.stop(Timers.SLEEP);
log.debug("Trying to assign {} map files that previously failed on some key extents",
assignmentFailures.size());
assignments.clear();
// for failed key extents, try to find children key extents to
// assign to
for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
Iterator<KeyExtent> keListIter = entry.getValue().iterator();
List<TabletLocation> tabletsToAssignMapFileTo = new ArrayList<>();
while (keListIter.hasNext()) {
KeyExtent ke = keListIter.next();
timer.start(Timers.QUERY_METADATA);
try {
tabletsToAssignMapFileTo
.addAll(findOverlappingTablets(context, fs, locator, entry.getKey(), ke));
keListIter.remove();
} catch (Exception ex) {
log.warn("Exception finding overlapping tablets, will retry tablet " + ke, ex);
}
timer.stop(Timers.QUERY_METADATA);
}
if (!tabletsToAssignMapFileTo.isEmpty())
assignments.put(entry.getKey(), tabletsToAssignMapFileTo);
}
assignmentStats.attemptingAssignments(assignments);
Map<Path,List<KeyExtent>> assignmentFailures2 =
assignMapFiles(fs, assignments, paths, numAssignThreads, numThreads);
assignmentStats.assignmentsFailed(assignmentFailures2);
// merge assignmentFailures2 into assignmentFailures
for (Entry<Path,List<KeyExtent>> entry : assignmentFailures2.entrySet()) {
assignmentFailures.get(entry.getKey()).addAll(entry.getValue());
Integer fc = failureCount.get(entry.getKey());
if (fc == null)
fc = 0;
failureCount.put(entry.getKey(), fc + 1);
}
// remove map files that have no more key extents to assign
assignmentFailures.values().removeIf(List::isEmpty);
Set<Entry<Path,Integer>> failureIter = failureCount.entrySet();
for (Entry<Path,Integer> entry : failureIter) {
int retries = context.getConfiguration().getCount(Property.TSERV_BULK_RETRY);
if (entry.getValue() > retries && assignmentFailures.get(entry.getKey()) != null) {
log.error("Map file {} failed more than {} times, giving up.", entry.getKey(), retries);
completeFailures.put(entry.getKey(), assignmentFailures.get(entry.getKey()));
assignmentFailures.remove(entry.getKey());
}
}
}
assignmentStats.assignmentsAbandoned(completeFailures);
Set<Path> failedFailures = processFailures(completeFailures);
assignmentStats.unrecoveredMapFiles(failedFailures);
timer.stop(Timers.TOTAL);
printReport(paths);
return assignmentStats;
} finally {
if (client != null) {
ServerClient.close(client);
}
}
}
private void printReport(Set<Path> paths) {
long totalTime = 0;
for (Timers t : Timers.values()) {
if (t == Timers.TOTAL)
continue;
totalTime += timer.get(t);
}
List<String> files = new ArrayList<>();
for (Path path : paths) {
files.add(path.getName());
}
Collections.sort(files);
log.debug("BULK IMPORT TIMING STATISTICS");
log.debug("Files: {}", files);
log.debug(String.format("Examine map files : %,10.2f secs %6.2f%s",
timer.getSecs(Timers.EXAMINE_MAP_FILES),
100.0 * timer.get(Timers.EXAMINE_MAP_FILES) / timer.get(Timers.TOTAL), "%"));
log.debug(String.format("Query %-14s : %,10.2f secs %6.2f%s", MetadataTable.NAME,
timer.getSecs(Timers.QUERY_METADATA),
100.0 * timer.get(Timers.QUERY_METADATA) / timer.get(Timers.TOTAL), "%"));
log.debug(String.format("Import Map Files : %,10.2f secs %6.2f%s",
timer.getSecs(Timers.IMPORT_MAP_FILES),
100.0 * timer.get(Timers.IMPORT_MAP_FILES) / timer.get(Timers.TOTAL), "%"));
log.debug(
String.format("Sleep : %,10.2f secs %6.2f%s", timer.getSecs(Timers.SLEEP),
100.0 * timer.get(Timers.SLEEP) / timer.get(Timers.TOTAL), "%"));
log.debug(String.format("Misc : %,10.2f secs %6.2f%s",
(timer.get(Timers.TOTAL) - totalTime) / 1000.0,
100.0 * (timer.get(Timers.TOTAL) - totalTime) / timer.get(Timers.TOTAL), "%"));
log.debug(String.format("Total : %,10.2f secs", timer.getSecs(Timers.TOTAL)));
}
private Set<Path> processFailures(Map<Path,List<KeyExtent>> completeFailures) {
// we should check if map file was not assigned to any tablets, then we
// should just move it; not currently being done?
Set<Entry<Path,List<KeyExtent>>> es = completeFailures.entrySet();
if (completeFailures.isEmpty())
return Collections.emptySet();
log.debug("The following map files failed ");
for (Entry<Path,List<KeyExtent>> entry : es) {
List<KeyExtent> extents = entry.getValue();
for (KeyExtent keyExtent : extents)
log.debug("\t{} -> {}", entry.getKey(), keyExtent);
}
return Collections.emptySet();
}
private class AssignmentInfo {
public AssignmentInfo(KeyExtent keyExtent, Long estSize) {
this.ke = keyExtent;
this.estSize = estSize;
}
KeyExtent ke;
long estSize;
}
private static List<KeyExtent> extentsOf(List<TabletLocation> locations) {
List<KeyExtent> result = new ArrayList<>(locations.size());
for (TabletLocation tl : locations)
result.add(tl.tablet_extent);
return result;
}
private Map<Path,List<AssignmentInfo>> estimateSizes(final VolumeManager vm,
Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads) {
long t1 = System.currentTimeMillis();
final Map<Path,Long> mapFileSizes = new TreeMap<>();
try {
for (Path path : paths) {
FileSystem fs = vm.getFileSystemByPath(path);
mapFileSizes.put(path, fs.getContentSummary(path).getLength());
}
} catch (IOException e) {
log.error("Failed to get map files in for {}: {}", paths, e.getMessage(), e);
throw new RuntimeException(e);
}
final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<>());
ExecutorService threadPool =
Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes"));
for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
if (entry.getValue().size() == 1) {
TabletLocation tabletLocation = entry.getValue().get(0);
// if the tablet completely contains the map file, there is no
// need to estimate its
// size
ais.put(entry.getKey(), Collections.singletonList(
new AssignmentInfo(tabletLocation.tablet_extent, mapFileSizes.get(entry.getKey()))));
continue;
}
Runnable estimationTask = new Runnable() {
@Override
public void run() {
Map<KeyExtent,Long> estimatedSizes = null;
try {
estimatedSizes = FileUtil.estimateSizes(context, entry.getKey(),
mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()));
} catch (IOException e) {
log.warn("Failed to estimate map file sizes {}", e.getMessage());
}
if (estimatedSizes == null) {
// estimation failed, do a simple estimation
estimatedSizes = new TreeMap<>();
long estSize =
(long) (mapFileSizes.get(entry.getKey()) / (double) entry.getValue().size());
for (TabletLocation tl : entry.getValue())
estimatedSizes.put(tl.tablet_extent, estSize);
}
List<AssignmentInfo> assignmentInfoList = new ArrayList<>(estimatedSizes.size());
for (Entry<KeyExtent,Long> entry2 : estimatedSizes.entrySet())
assignmentInfoList.add(new AssignmentInfo(entry2.getKey(), entry2.getValue()));
ais.put(entry.getKey(), assignmentInfoList);
}
};
threadPool.submit(new TraceRunnable(new LoggingRunnable(log, estimationTask)));
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
try {
threadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Encountered InterruptedException while waiting for the threadPool to terminate.",
e);
throw new RuntimeException(e);
}
}
long t2 = System.currentTimeMillis();
log.debug(String.format("Estimated map files sizes in %6.2f secs", (t2 - t1) / 1000.0));
return ais;
}
private static Map<KeyExtent,String> locationsOf(Map<Path,List<TabletLocation>> assignments) {
Map<KeyExtent,String> result = new HashMap<>();
for (List<TabletLocation> entry : assignments.values()) {
for (TabletLocation tl : entry) {
result.put(tl.tablet_extent, tl.tablet_location);
}
}
return result;
}
private Map<Path,List<KeyExtent>> assignMapFiles(VolumeManager fs,
Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads,
int numMapThreads) {
timer.start(Timers.EXAMINE_MAP_FILES);
Map<Path,List<AssignmentInfo>> assignInfo =
estimateSizes(fs, assignments, paths, numMapThreads);
timer.stop(Timers.EXAMINE_MAP_FILES);
Map<Path,List<KeyExtent>> ret;
timer.start(Timers.IMPORT_MAP_FILES);
ret = assignMapFiles(assignInfo, locationsOf(assignments), numThreads);
timer.stop(Timers.IMPORT_MAP_FILES);
return ret;
}
private class AssignmentTask implements Runnable {
final Map<Path,List<KeyExtent>> assignmentFailures;
HostAndPort location;
private Map<KeyExtent,List<PathSize>> assignmentsPerTablet;
public AssignmentTask(Map<Path,List<KeyExtent>> assignmentFailures, String location,
Map<KeyExtent,List<PathSize>> assignmentsPerTablet) {
this.assignmentFailures = assignmentFailures;
this.location = HostAndPort.fromString(location);
this.assignmentsPerTablet = assignmentsPerTablet;
}
private void handleFailures(Collection<KeyExtent> failures, String message) {
failures.forEach(ke -> {
List<PathSize> mapFiles = assignmentsPerTablet.get(ke);
synchronized (assignmentFailures) {
mapFiles.forEach(pathSize -> assignmentFailures
.computeIfAbsent(pathSize.path, k -> new ArrayList<>()).add(ke));
}
log.info("Could not assign {} map files to tablet {} because : {}. Will retry ...",
mapFiles.size(), ke, message);
});
}
@Override
public void run() {
HashSet<Path> uniqMapFiles = new HashSet<>();
for (List<PathSize> mapFiles : assignmentsPerTablet.values())
for (PathSize ps : mapFiles)
uniqMapFiles.add(ps.path);
log.debug("Assigning {} map files to {} tablets at {}", uniqMapFiles.size(),
assignmentsPerTablet.size(), location);
try {
List<KeyExtent> failures = assignMapFiles(context, location, assignmentsPerTablet);
handleFailures(failures, "Not Serving Tablet");
} catch (AccumuloException | AccumuloSecurityException e) {
handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
}
}
}
private class PathSize {
public PathSize(Path mapFile, long estSize) {
this.path = mapFile;
this.estSize = estSize;
}
Path path;
long estSize;
@Override
public String toString() {
return path + " " + estSize;
}
}
private Map<Path,List<KeyExtent>> assignMapFiles(Map<Path,List<AssignmentInfo>> assignments,
Map<KeyExtent,String> locations, int numThreads) {
// group assignments by tablet
Map<KeyExtent,List<PathSize>> assignmentsPerTablet = new TreeMap<>();
assignments.forEach((mapFile, tabletsToAssignMapFileTo) -> tabletsToAssignMapFileTo
.forEach(assignmentInfo -> assignmentsPerTablet
.computeIfAbsent(assignmentInfo.ke, k -> new ArrayList<>())
.add(new PathSize(mapFile, assignmentInfo.estSize))));
// group assignments by tabletserver
Map<Path,List<KeyExtent>> assignmentFailures = Collections.synchronizedMap(new TreeMap<>());
TreeMap<String,Map<KeyExtent,List<PathSize>>> assignmentsPerTabletServer = new TreeMap<>();
assignmentsPerTablet.forEach((ke, pathSizes) -> {
String location = locations.get(ke);
if (location == null) {
synchronized (assignmentFailures) {
pathSizes.forEach(pathSize -> assignmentFailures
.computeIfAbsent(pathSize.path, k -> new ArrayList<>()).add(ke));
}
log.warn(
"Could not assign {} map files to tablet {} because it had no location, will retry ...",
pathSizes.size(), ke);
} else {
assignmentsPerTabletServer.computeIfAbsent(location, k -> new TreeMap<>()).put(ke,
pathSizes);
}
});
ExecutorService threadPool =
Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit"));
for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer
.entrySet()) {
String location = entry.getKey();
threadPool.submit(new AssignmentTask(assignmentFailures, location, entry.getValue()));
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
try {
threadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error(
"Encountered InterruptedException while waiting for the thread pool to terminate.", e);
throw new RuntimeException(e);
}
}
return assignmentFailures;
}
private List<KeyExtent> assignMapFiles(ClientContext context, HostAndPort location,
Map<KeyExtent,List<PathSize>> assignmentsPerTablet)
throws AccumuloException, AccumuloSecurityException {
try {
long timeInMillis = context.getConfiguration().getTimeInMillis(Property.TSERV_BULK_TIMEOUT);
TabletClientService.Iface client =
ThriftUtil.getTServerClient(location, context, timeInMillis);
try {
HashMap<KeyExtent,Map<String,org.apache.accumulo.core.dataImpl.thrift.MapFileInfo>> files =
new HashMap<>();
for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
HashMap<String,org.apache.accumulo.core.dataImpl.thrift.MapFileInfo> tabletFiles =
new HashMap<>();
files.put(entry.getKey(), tabletFiles);
for (PathSize pathSize : entry.getValue()) {
org.apache.accumulo.core.dataImpl.thrift.MapFileInfo mfi =
new org.apache.accumulo.core.dataImpl.thrift.MapFileInfo(pathSize.estSize);
tabletFiles.put(pathSize.path.toString(), mfi);
}
}
log.debug("Asking {} to bulk load {}", location, files);
List<TKeyExtent> failures = client.bulkImport(TraceUtil.traceInfo(), context.rpcCreds(),
tid, Translator.translate(files, Translators.KET), setTime);
return Translator.translate(failures, Translators.TKET);
} finally {
ThriftUtil.returnClient((TServiceClient) client);
}
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (Throwable t) {
log.error("Encountered unknown exception in assignMapFiles.", t);
throw new AccumuloException(t);
}
}
public static List<TabletLocation> findOverlappingTablets(ServerContext context, VolumeManager fs,
TabletLocator locator, Path file) throws Exception {
return findOverlappingTablets(context, fs, locator, file, null, null);
}
public static List<TabletLocation> findOverlappingTablets(ServerContext context, VolumeManager fs,
TabletLocator locator, Path file, KeyExtent failed) throws Exception {
locator.invalidateCache(failed);
Text start = getStartRowForExtent(failed);
return findOverlappingTablets(context, fs, locator, file, start, failed.endRow());
}
protected static Text getStartRowForExtent(KeyExtent extent) {
Text start = extent.prevEndRow();
if (start != null) {
start = new Text(start);
// ACCUMULO-3967 We want the first possible key in this tablet, not the following row from the
// previous tablet
start.append(byte0, 0, 1);
}
return start;
}
static final byte[] byte0 = {0};
public static List<TabletLocation> findOverlappingTablets(ServerContext context, VolumeManager vm,
TabletLocator locator, Path file, Text startRow, Text endRow) throws Exception {
List<TabletLocation> result = new ArrayList<>();
Collection<ByteSequence> columnFamilies = Collections.emptyList();
String filename = file.toString();
// log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow);
FileSystem fs = vm.getFileSystemByPath(file);
try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(filename, fs, fs.getConf(), context.getCryptoService())
.withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) {
Text row = startRow;
if (row == null)
row = new Text();
while (true) {
// log.debug(filename + " Seeking to row " + row);
reader.seek(new Range(row, null), columnFamilies, false);
if (!reader.hasTop()) {
// log.debug(filename + " not found");
break;
}
row = reader.getTopKey().getRow();
TabletLocation tabletLocation = locator.locateTablet(context, row, false, true);
// log.debug(filename + " found row " + row + " at location " + tabletLocation);
result.add(tabletLocation);
row = tabletLocation.tablet_extent.endRow();
if (row != null && (endRow == null || row.compareTo(endRow) < 0)) {
row = new Text(row);
row.append(byte0, 0, byte0.length);
} else
break;
}
}
// log.debug(filename + " to be sent to " + result);
return result;
}
public static class AssignmentStats {
private Map<KeyExtent,Integer> counts;
private int numUniqueMapFiles;
private Map<Path,List<KeyExtent>> completeFailures = null;
private Set<Path> failedFailures = null;
AssignmentStats(int fileCount) {
counts = new HashMap<>();
numUniqueMapFiles = fileCount;
}
void attemptingAssignments(Map<Path,List<TabletLocation>> assignments) {
for (Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
for (TabletLocation tl : entry.getValue()) {
Integer count = getCount(tl.tablet_extent);
counts.put(tl.tablet_extent, count + 1);
}
}
}
void assignmentsFailed(Map<Path,List<KeyExtent>> assignmentFailures) {
for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
for (KeyExtent ke : entry.getValue()) {
Integer count = getCount(ke);
counts.put(ke, count - 1);
}
}
}
void assignmentsAbandoned(Map<Path,List<KeyExtent>> completeFailures) {
this.completeFailures = completeFailures;
}
private Integer getCount(KeyExtent parent) {
Integer count = counts.get(parent);
if (count == null) {
count = 0;
}
return count;
}
void unrecoveredMapFiles(Set<Path> failedFailures) {
this.failedFailures = failedFailures;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
int totalAssignments = 0;
int tabletsImportedTo = 0;
int min = Integer.MAX_VALUE, max = Integer.MIN_VALUE;
for (Entry<KeyExtent,Integer> entry : counts.entrySet()) {
totalAssignments += entry.getValue();
if (entry.getValue() > 0)
tabletsImportedTo++;
if (entry.getValue() < min)
min = entry.getValue();
if (entry.getValue() > max)
max = entry.getValue();
}
double stddev = 0;
for (Entry<KeyExtent,Integer> entry : counts.entrySet())
stddev += Math.pow(entry.getValue() - totalAssignments / (double) counts.size(), 2);
stddev = stddev / counts.size();
stddev = Math.sqrt(stddev);
Set<KeyExtent> failedTablets = new HashSet<>();
for (List<KeyExtent> ft : completeFailures.values())
failedTablets.addAll(ft);
sb.append("BULK IMPORT ASSIGNMENT STATISTICS\n");
sb.append(String.format("# of map files : %,10d%n", numUniqueMapFiles));
sb.append(String.format("# map files with failures : %,10d %6.2f%s%n",
completeFailures.size(), completeFailures.size() * 100.0 / numUniqueMapFiles, "%"));
sb.append(String.format("# failed failed map files : %,10d %s%n", failedFailures.size(),
failedFailures.isEmpty() ? "" : " <-- THIS IS BAD"));
sb.append(String.format("# of tablets : %,10d%n", counts.size()));
sb.append(String.format("# tablets imported to : %,10d %6.2f%s%n", tabletsImportedTo,
tabletsImportedTo * 100.0 / counts.size(), "%"));
sb.append(String.format("# tablets with failures : %,10d %6.2f%s%n", failedTablets.size(),
failedTablets.size() * 100.0 / counts.size(), "%"));
sb.append(String.format("min map files per tablet : %,10d%n", min));
sb.append(String.format("max map files per tablet : %,10d%n", max));
sb.append(String.format("avg map files per tablet : %,10.2f (std dev = %.2f)%n",
totalAssignments / (double) counts.size(), stddev));
return sb.toString();
}
}
}