| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.accumulo.core.client.impl; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static java.util.stream.Collectors.groupingBy; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.stream.Stream; |
| |
| import org.apache.accumulo.core.Constants; |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.AccumuloSecurityException; |
| import org.apache.accumulo.core.client.NamespaceExistsException; |
| import org.apache.accumulo.core.client.NamespaceNotFoundException; |
| import org.apache.accumulo.core.client.TableExistsException; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationArguments; |
| import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions; |
| import org.apache.accumulo.core.client.impl.Bulk.FileInfo; |
| import org.apache.accumulo.core.client.impl.Bulk.Files; |
| import org.apache.accumulo.core.client.impl.Table.ID; |
| import org.apache.accumulo.core.conf.AccumuloConfiguration; |
| import org.apache.accumulo.core.conf.ClientProperty; |
| import org.apache.accumulo.core.conf.ConfigurationCopy; |
| import org.apache.accumulo.core.conf.ConfigurationTypeHelper; |
| import org.apache.accumulo.core.data.ByteSequence; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.LoadPlan; |
| import org.apache.accumulo.core.data.LoadPlan.Destination; |
| import org.apache.accumulo.core.data.LoadPlan.RangeType; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.data.impl.KeyExtent; |
| import org.apache.accumulo.core.file.FileOperations; |
| import org.apache.accumulo.core.file.FileSKVIterator; |
| import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; |
| import org.apache.accumulo.core.master.thrift.FateOperation; |
| import org.apache.accumulo.core.util.CachedConfiguration; |
| import org.apache.accumulo.core.volume.VolumeConfiguration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.collect.Sets; |
| |
| public class BulkImport implements ImportDestinationArguments, ImportMappingOptions { |
| |
| private static final Logger log = LoggerFactory.getLogger(BulkImport.class); |
| |
| private boolean setTime = false; |
| private Executor executor = null; |
| private final String dir; |
| private int numThreads = -1; |
| |
| private final ClientContext context; |
| private String tableName; |
| |
| private LoadPlan plan = null; |
| |
| BulkImport(String directory, ClientContext context) { |
| this.context = context; |
| this.dir = Objects.requireNonNull(directory); |
| } |
| |
| @Override |
| public ImportMappingOptions tableTime() { |
| this.setTime = true; |
| return this; |
| } |
| |
| @Override |
| public void load() |
| throws TableNotFoundException, IOException, AccumuloException, AccumuloSecurityException { |
| |
| Table.ID tableId = Tables.getTableId(context, tableName); |
| |
| Map<String,String> props = context.getClient().instanceOperations().getSystemConfiguration(); |
| AccumuloConfiguration conf = new ConfigurationCopy(props); |
| |
| FileSystem fs = VolumeConfiguration.getVolume(dir, CachedConfiguration.getInstance(), conf) |
| .getFileSystem(); |
| |
| Path srcPath = checkPath(fs, dir); |
| |
| SortedMap<KeyExtent,Bulk.Files> mappings; |
| if (plan == null) { |
| mappings = computeMappingFromFiles(fs, tableId, srcPath); |
| } else { |
| mappings = computeMappingFromPlan(fs, tableId, srcPath); |
| } |
| |
| BulkSerialize.writeLoadMapping(mappings, srcPath.toString(), fs::create); |
| |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8()), |
| ByteBuffer.wrap(srcPath.toString().getBytes(UTF_8)), |
| ByteBuffer.wrap((setTime + "").getBytes(UTF_8))); |
| doFateOperation(FateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(), tableName); |
| } |
| |
| /** |
| * Check path of bulk directory and permissions |
| */ |
| private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloException { |
| Path ret; |
| |
| if (dir.contains(":")) { |
| ret = new Path(dir); |
| } else { |
| ret = fs.makeQualified(new Path(dir)); |
| } |
| |
| try { |
| if (!fs.getFileStatus(ret).isDirectory()) { |
| throw new AccumuloException("Bulk import directory " + dir + " is not a directory!"); |
| } |
| Path tmpFile = new Path(ret, "isWritable"); |
| if (fs.createNewFile(tmpFile)) |
| fs.delete(tmpFile, true); |
| else |
| throw new AccumuloException("Bulk import directory " + dir + " is not writable."); |
| } catch (FileNotFoundException fnf) { |
| throw new AccumuloException( |
| "Bulk import directory " + dir + " does not exist or has bad permissions", fnf); |
| } |
| |
| // TODO ensure dir does not contain bulk load mapping |
| |
| return ret; |
| } |
| |
| @Override |
| public ImportMappingOptions executor(Executor service) { |
| this.executor = Objects.requireNonNull(service); |
| return this; |
| } |
| |
| @Override |
| public ImportMappingOptions threads(int numThreads) { |
| Preconditions.checkArgument(numThreads > 0, "Non positive number of threads given : %s", |
| numThreads); |
| this.numThreads = numThreads; |
| return this; |
| } |
| |
| @Override |
| public ImportMappingOptions plan(LoadPlan plan) { |
| this.plan = plan; |
| return this; |
| } |
| |
| @Override |
| public ImportMappingOptions to(String tableName) { |
| this.tableName = Objects.requireNonNull(tableName); |
| return this; |
| } |
| |
| private static final byte[] byte0 = {0}; |
| |
| private static class MLong { |
| public MLong(long i) { |
| l = i; |
| } |
| |
| long l; |
| } |
| |
| public static Map<KeyExtent,Long> estimateSizes(AccumuloConfiguration acuConf, Path mapFile, |
| long fileSize, Collection<KeyExtent> extents, FileSystem ns, Cache<String,Long> fileLenCache) |
| throws IOException { |
| |
| if (extents.size() == 1) { |
| return Collections.singletonMap(extents.iterator().next(), fileSize); |
| } |
| |
| long totalIndexEntries = 0; |
| Map<KeyExtent,MLong> counts = new TreeMap<>(); |
| for (KeyExtent keyExtent : extents) |
| counts.put(keyExtent, new MLong(0)); |
| |
| Text row = new Text(); |
| |
| FileSKVIterator index = FileOperations.getInstance().newIndexReaderBuilder() |
| .forFile(mapFile.toString(), ns, ns.getConf()).withTableConfiguration(acuConf) |
| .withFileLenCache(fileLenCache).build(); |
| |
| try { |
| while (index.hasTop()) { |
| Key key = index.getTopKey(); |
| totalIndexEntries++; |
| key.getRow(row); |
| |
| // TODO this could use a binary search |
| for (Entry<KeyExtent,MLong> entry : counts.entrySet()) |
| if (entry.getKey().contains(row)) |
| entry.getValue().l++; |
| |
| index.next(); |
| } |
| } finally { |
| try { |
| if (index != null) |
| index.close(); |
| } catch (IOException e) { |
| log.debug("Failed to close " + mapFile, e); |
| } |
| } |
| |
| Map<KeyExtent,Long> results = new TreeMap<>(); |
| for (KeyExtent keyExtent : extents) { |
| double numEntries = counts.get(keyExtent).l; |
| if (numEntries == 0) |
| numEntries = 1; |
| long estSize = (long) ((numEntries / totalIndexEntries) * fileSize); |
| results.put(keyExtent, estSize); |
| } |
| return results; |
| } |
| |
| public interface KeyExtentCache { |
| KeyExtent lookup(Text row) |
| throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException; |
| } |
| |
| public static List<KeyExtent> findOverlappingTablets(ClientContext context, |
| KeyExtentCache extentCache, FileSKVIterator reader) |
| throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| |
| Text startRow = null; |
| Text endRow = null; |
| |
| List<KeyExtent> result = new ArrayList<>(); |
| Collection<ByteSequence> columnFamilies = Collections.emptyList(); |
| Text row = startRow; |
| if (row == null) |
| row = new Text(); |
| while (true) { |
| // log.debug(filename + " Seeking to row " + row); |
| reader.seek(new Range(row, null), columnFamilies, false); |
| if (!reader.hasTop()) { |
| // log.debug(filename + " not found"); |
| break; |
| } |
| row = reader.getTopKey().getRow(); |
| KeyExtent extent = extentCache.lookup(row); |
| // log.debug(filename + " found row " + row + " at location " + tabletLocation); |
| result.add(extent); |
| row = extent.getEndRow(); |
| if (row != null && (endRow == null || row.compareTo(endRow) < 0)) { |
| row = nextRow(row); |
| } else |
| break; |
| } |
| |
| return result; |
| } |
| |
| private static Text nextRow(Text row) { |
| Text next = new Text(row); |
| next.append(byte0, 0, byte0.length); |
| return next; |
| } |
| |
| public static List<KeyExtent> findOverlappingTablets(ClientContext context, |
| KeyExtentCache extentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache) |
| throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() |
| .forFile(file.toString(), fs, fs.getConf()) |
| .withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache) |
| .seekToBeginning().build()) { |
| return findOverlappingTablets(context, extentCache, reader); |
| } |
| } |
| |
| private static Map<String,Long> getFileLenMap(FileStatus[] statuses) { |
| HashMap<String,Long> fileLens = new HashMap<>(); |
| for (FileStatus status : statuses) { |
| fileLens.put(status.getPath().getName(), status.getLen()); |
| status.getLen(); |
| } |
| |
| return fileLens; |
| } |
| |
| private static Cache<String,Long> getPopulatedFileLenCache(Path dir, FileStatus[] statuses) { |
| Map<String,Long> fileLens = getFileLenMap(statuses); |
| |
| Map<String,Long> absFileLens = new HashMap<>(); |
| fileLens.forEach((k, v) -> { |
| absFileLens.put(CachableBlockFile.pathToCacheId(new Path(dir, k)), v); |
| }); |
| |
| Cache<String,Long> fileLenCache = CacheBuilder.newBuilder().build(); |
| |
| fileLenCache.putAll(absFileLens); |
| |
| return fileLenCache; |
| } |
| |
| private SortedMap<KeyExtent,Files> computeMappingFromPlan(FileSystem fs, ID tableId, Path srcPath) |
| throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| |
| Map<String,List<Destination>> fileDestinations = plan.getDestinations().stream() |
| .collect(groupingBy(Destination::getFileName)); |
| |
| FileStatus[] statuses = fs.listStatus(srcPath, |
| p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING)); |
| |
| Map<String,Long> fileLens = getFileLenMap(statuses); |
| |
| if (!fileDestinations.keySet().equals(fileLens.keySet())) { |
| throw new IllegalArgumentException( |
| "Load plan files differ from directory files, symmetric difference : " |
| + Sets.symmetricDifference(fileDestinations.keySet(), fileLens.keySet())); |
| } |
| |
| KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, context); |
| |
| // Pre-populate cache by looking up all end rows in sorted order. Doing this in sorted order |
| // leverages read ahead. |
| fileDestinations.values().stream().flatMap(List::stream) |
| .filter(dest -> dest.getRangeType() == RangeType.FILE) |
| .flatMap(dest -> Stream.of(dest.getStartRow(), dest.getEndRow())).filter(row -> row != null) |
| .map(Text::new).sorted().distinct().forEach(row -> { |
| try { |
| extentCache.lookup(row); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }); |
| |
| SortedMap<KeyExtent,Files> mapping = new TreeMap<>(); |
| |
| for (Entry<String,List<Destination>> entry : fileDestinations.entrySet()) { |
| String fileName = entry.getKey(); |
| List<Destination> destinations = entry.getValue(); |
| Set<KeyExtent> extents = mapDesitnationsToExtents(tableId, extentCache, destinations); |
| |
| long estSize = (long) (fileLens.get(fileName) / (double) extents.size()); |
| |
| for (KeyExtent keyExtent : extents) { |
| mapping.computeIfAbsent(keyExtent, k -> new Files()) |
| .add(new FileInfo(fileName, estSize, 0)); |
| } |
| } |
| |
| return mergeOverlapping(mapping); |
| } |
| |
| private Text toText(byte[] row) { |
| return row == null ? null : new Text(row); |
| } |
| |
| private Set<KeyExtent> mapDesitnationsToExtents(Table.ID tableId, KeyExtentCache kec, |
| List<Destination> destinations) |
| throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| Set<KeyExtent> extents = new HashSet<>(); |
| |
| for (Destination dest : destinations) { |
| |
| if (dest.getRangeType() == RangeType.TABLE) { |
| extents.add(new KeyExtent(tableId, toText(dest.getEndRow()), toText(dest.getStartRow()))); |
| } else if (dest.getRangeType() == RangeType.FILE) { |
| Text startRow = new Text(dest.getStartRow()); |
| Text endRow = new Text(dest.getEndRow()); |
| |
| KeyExtent extent = kec.lookup(startRow); |
| |
| extents.add(extent); |
| |
| while (!extent.contains(endRow) && extent.getEndRow() != null) { |
| extent = kec.lookup(nextRow(extent.getEndRow())); |
| extents.add(extent); |
| } |
| |
| } else { |
| throw new IllegalStateException(); |
| } |
| } |
| |
| return extents; |
| } |
| |
| private SortedMap<KeyExtent,Bulk.Files> computeMappingFromFiles(FileSystem fs, Table.ID tableId, |
| Path dirPath) throws IOException { |
| |
| Executor executor; |
| ExecutorService service = null; |
| |
| if (this.executor != null) { |
| executor = this.executor; |
| } else if (numThreads > 0) { |
| executor = service = Executors.newFixedThreadPool(numThreads); |
| } else { |
| String threads = context.getConfiguration().get(ClientProperty.BULK_LOAD_THREADS.getKey()); |
| executor = service = Executors |
| .newFixedThreadPool(ConfigurationTypeHelper.getNumThreads(threads)); |
| } |
| |
| try { |
| return computeFileToTabletMappings(fs, tableId, dirPath, executor, context); |
| } finally { |
| if (service != null) { |
| service.shutdown(); |
| } |
| } |
| |
| } |
| |
| public static SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs, |
| Table.ID tableId, Path dirPath, Executor executor, ClientContext context) throws IOException { |
| |
| KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, context); |
| |
| FileStatus[] files = fs.listStatus(dirPath, |
| p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING)); |
| |
| // we know all of the file lens, so construct a cache and populate it in order to avoid later |
| // trips to the namenode |
| Cache<String,Long> fileLensCache = getPopulatedFileLenCache(dirPath, files); |
| |
| List<CompletableFuture<Map<KeyExtent,Bulk.FileInfo>>> futures = new ArrayList<>(); |
| |
| for (FileStatus fileStatus : files) { |
| CompletableFuture<Map<KeyExtent,Bulk.FileInfo>> future = CompletableFuture.supplyAsync(() -> { |
| try { |
| long t1 = System.currentTimeMillis(); |
| List<KeyExtent> extents = findOverlappingTablets(context, extentCache, |
| fileStatus.getPath(), fs, fileLensCache); |
| Map<KeyExtent,Long> estSizes = estimateSizes(context.getConfiguration(), |
| fileStatus.getPath(), fileStatus.getLen(), extents, fs, fileLensCache); |
| Map<KeyExtent,Bulk.FileInfo> pathLocations = new HashMap<>(); |
| for (KeyExtent ke : extents) { |
| pathLocations.put(ke, |
| new Bulk.FileInfo(fileStatus.getPath(), estSizes.getOrDefault(ke, 0L))); |
| } |
| long t2 = System.currentTimeMillis(); |
| log.trace("Mapped {} to {} tablets in {}ms", fileStatus.getPath(), pathLocations.size(), |
| t2 - t1); |
| return pathLocations; |
| } catch (Exception e) { |
| throw new CompletionException(e); |
| } |
| }, executor); |
| |
| futures.add(future); |
| } |
| |
| SortedMap<KeyExtent,Bulk.Files> mappings = new TreeMap<>(); |
| |
| for (CompletableFuture<Map<KeyExtent,Bulk.FileInfo>> future : futures) { |
| try { |
| Map<KeyExtent,Bulk.FileInfo> pathMapping = future.get(); |
| pathMapping.forEach((extent, path) -> { |
| mappings.computeIfAbsent(extent, k -> new Bulk.Files()).add(path); |
| }); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(e); |
| } catch (ExecutionException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| return mergeOverlapping(mappings); |
| } |
| |
| // This method handles the case of splits happening while files are being examined. It merges |
| // smaller tablets into large tablets. |
| static SortedMap<KeyExtent,Bulk.Files> mergeOverlapping( |
| SortedMap<KeyExtent,Bulk.Files> mappings) { |
| List<KeyExtent> extents = new ArrayList<>(mappings.keySet()); |
| |
| for (KeyExtent ke : extents) { |
| Set<KeyExtent> overlapping = KeyExtent.findOverlapping(ke, mappings); |
| for (KeyExtent oke : overlapping) { |
| if (ke.equals(oke)) { |
| continue; |
| } |
| |
| boolean containsPrevRow = ke.getPrevEndRow() == null || (oke.getPrevEndRow() != null |
| && ke.getPrevEndRow().compareTo(oke.getPrevEndRow()) <= 0); |
| boolean containsEndRow = ke.getEndRow() == null |
| || (oke.getEndRow() != null && ke.getEndRow().compareTo(oke.getEndRow()) >= 0); |
| |
| if (containsPrevRow && containsEndRow) { |
| mappings.get(ke).merge(mappings.remove(oke)); |
| } else { |
| throw new RuntimeException("TODO handle merges"); |
| } |
| } |
| } |
| |
| return mappings; |
| } |
| |
| private String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, |
| String tableName) throws AccumuloSecurityException, AccumuloException { |
| try { |
| return new TableOperationsImpl(context).doFateOperation(op, args, opts, tableName); |
| } catch (TableExistsException | TableNotFoundException | NamespaceNotFoundException |
| | NamespaceExistsException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } |
| } |
| } |