| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.test.functional; |
| |
| import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; |
| import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; |
| import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertThrows; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.math.BigInteger; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.security.MessageDigest; |
| import java.security.NoSuchAlgorithmException; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| import java.util.concurrent.ExecutionException; |
| import java.util.stream.Collectors; |
| |
| import org.apache.accumulo.core.client.Accumulo; |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.admin.NewTableConfiguration; |
| import org.apache.accumulo.core.client.admin.TimeType; |
| import org.apache.accumulo.core.conf.AccumuloConfiguration; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.crypto.CryptoServiceFactory; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.LoadPlan; |
| import org.apache.accumulo.core.data.LoadPlan.RangeType; |
| import org.apache.accumulo.core.data.TableId; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.file.FileOperations; |
| import org.apache.accumulo.core.file.FileSKVWriter; |
| import org.apache.accumulo.core.file.rfile.RFile; |
| import org.apache.accumulo.core.metadata.schema.TabletMetadata; |
| import org.apache.accumulo.core.metadata.schema.TabletsMetadata; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.harness.MiniClusterConfigurationCallback; |
| import org.apache.accumulo.harness.SharedMiniClusterBase; |
| import org.apache.accumulo.minicluster.MemoryUnit; |
| import org.apache.accumulo.minicluster.ServerType; |
| import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RawLocalFileSystem; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.Text; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| |
| /** |
| * Tests new bulk import technique. For the old technique see {@link BulkOldIT} |
| * |
| * @since 2.0 |
| */ |
| public class BulkNewIT extends SharedMiniClusterBase { |
| |
| @BeforeClass |
| public static void setup() throws Exception { |
| SharedMiniClusterBase.startMiniClusterWithConfig(new Callback()); |
| } |
| |
| @AfterClass |
| public static void teardown() { |
| SharedMiniClusterBase.stopMiniCluster(); |
| } |
| |
| private static class Callback implements MiniClusterConfigurationCallback { |
| @Override |
| public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) { |
| cfg.setMemory(ServerType.TABLET_SERVER, 512, MemoryUnit.MEGABYTE); |
| |
| // use raw local file system |
| conf.set("fs.file.impl", RawLocalFileSystem.class.getName()); |
| } |
| } |
| |
| @Override |
| protected int defaultTimeoutSeconds() { |
| return 4 * 60; |
| } |
| |
| private String tableName; |
| private AccumuloConfiguration aconf; |
| private FileSystem fs; |
| private String rootPath; |
| |
| @Before |
| public void setupBulkTest() throws Exception { |
| try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { |
| tableName = getUniqueNames(1)[0]; |
| c.tableOperations().create(tableName); |
| aconf = getCluster().getServerContext().getConfiguration(); |
| fs = getCluster().getFileSystem(); |
| rootPath = getCluster().getTemporaryPath().toString(); |
| } |
| } |
| |
| private String getDir(String testName) throws Exception { |
| String dir = rootPath + testName + getUniqueNames(1)[0]; |
| fs.delete(new Path(dir), true); |
| return dir; |
| } |
| |
| private void testSingleTabletSingleFile(AccumuloClient c, boolean offline, boolean setTime) |
| throws Exception { |
| addSplits(c, tableName, "0333"); |
| |
| if (offline) { |
| c.tableOperations().offline(tableName); |
| } |
| |
| String dir = getDir("/testSingleTabletSingleFileNoSplits-"); |
| |
| String h1 = writeData(dir + "/f1.", aconf, 0, 332); |
| |
| c.tableOperations().importDirectory(dir).to(tableName).tableTime(setTime).load(); |
| |
| if (offline) { |
| c.tableOperations().online(tableName); |
| } |
| |
| verifyData(c, tableName, 0, 332, setTime); |
| verifyMetadata(c, tableName, Map.of("0333", Set.of(h1), "null", Set.of())); |
| } |
| |
| @Test |
| public void testSingleTabletSingleFile() throws Exception { |
| try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { |
| testSingleTabletSingleFile(client, false, false); |
| } |
| } |
| |
| @Test |
| public void testSetTime() throws Exception { |
| try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { |
| tableName = "testSetTime_table1"; |
| NewTableConfiguration newTableConf = new NewTableConfiguration(); |
| // set logical time type so we can set time on bulk import |
| newTableConf.setTimeType(TimeType.LOGICAL); |
| client.tableOperations().create(tableName, newTableConf); |
| testSingleTabletSingleFile(client, false, true); |
| } |
| } |
| |
| @Test |
| public void testSingleTabletSingleFileOffline() throws Exception { |
| try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { |
| testSingleTabletSingleFile(client, true, false); |
| } |
| } |
| |
| @Test |
| public void testMaxTablets() throws Exception { |
| try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { |
| tableName = "testMaxTablets_table1"; |
| NewTableConfiguration newTableConf = new NewTableConfiguration(); |
| // set logical time type so we can set time on bulk import |
| var props = Map.of(Property.TABLE_BULK_MAX_TABLETS.getKey(), "2"); |
| newTableConf.setProperties(props); |
| client.tableOperations().create(tableName, newTableConf); |
| |
| // test max tablets hit while inspecting bulk files |
| var thrown = assertThrows(RuntimeException.class, () -> testBulkFileMax(false)); |
| var c = thrown.getCause(); |
| assertTrue("Wrong exception: " + c, c instanceof ExecutionException); |
| assertTrue("Wrong exception: " + c.getCause(), |
| c.getCause() instanceof IllegalArgumentException); |
| var msg = c.getCause().getMessage(); |
| assertTrue("Bad File not in exception: " + msg, msg.contains("bad-file.rf")); |
| |
| // test max tablets hit using load plan on the server side |
| c = assertThrows(AccumuloException.class, () -> testBulkFileMax(true)); |
| msg = c.getMessage(); |
| assertTrue("Bad File not in exception: " + msg, msg.contains("bad-file.rf")); |
| } |
| } |
| |
| private void testSingleTabletSingleFileNoSplits(AccumuloClient c, boolean offline) |
| throws Exception { |
| if (offline) { |
| c.tableOperations().offline(tableName); |
| } |
| |
| String dir = getDir("/testSingleTabletSingleFileNoSplits-"); |
| |
| String h1 = writeData(dir + "/f1.", aconf, 0, 333); |
| |
| c.tableOperations().importDirectory(dir).to(tableName).load(); |
| |
| if (offline) { |
| c.tableOperations().online(tableName); |
| } |
| |
| verifyData(c, tableName, 0, 333, false); |
| verifyMetadata(c, tableName, Map.of("null", Set.of(h1))); |
| } |
| |
| @Test |
| public void testSingleTabletSingleFileNoSplits() throws Exception { |
| try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { |
| testSingleTabletSingleFileNoSplits(client, false); |
| } |
| } |
| |
| @Test |
| public void testSingleTabletSingleFileNoSplitsOffline() throws Exception { |
| try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { |
| testSingleTabletSingleFileNoSplits(client, true); |
| } |
| } |
| |
| @Test |
| public void testBadPermissions() throws Exception { |
| try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { |
| addSplits(c, tableName, "0333"); |
| |
| String dir = getDir("/testBadPermissions-"); |
| |
| writeData(dir + "/f1.", aconf, 0, 333); |
| |
| Path rFilePath = new Path(dir, "f1." + RFile.EXTENSION); |
| FsPermission originalPerms = fs.getFileStatus(rFilePath).getPermission(); |
| fs.setPermission(rFilePath, FsPermission.valueOf("----------")); |
| try { |
| c.tableOperations().importDirectory(dir).to(tableName).load(); |
| } catch (Exception e) { |
| Throwable cause = e.getCause(); |
| if (!(cause instanceof FileNotFoundException) |
| && !(cause.getCause() instanceof FileNotFoundException)) { |
| fail("Expected FileNotFoundException but threw " + e.getCause()); |
| } |
| } finally { |
| fs.setPermission(rFilePath, originalPerms); |
| } |
| |
| originalPerms = fs.getFileStatus(new Path(dir)).getPermission(); |
| fs.setPermission(new Path(dir), FsPermission.valueOf("dr--r--r--")); |
| try { |
| c.tableOperations().importDirectory(dir).to(tableName).load(); |
| } catch (AccumuloException ae) { |
| if (!(ae.getCause() instanceof FileNotFoundException)) { |
| fail("Expected FileNotFoundException but threw " + ae.getCause()); |
| } |
| } finally { |
| fs.setPermission(new Path(dir), originalPerms); |
| } |
| } |
| } |
| |
| private void testBulkFile(boolean offline, boolean usePlan) throws Exception { |
| try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { |
| addSplits(c, tableName, "0333 0666 0999 1333 1666"); |
| |
| if (offline) { |
| c.tableOperations().offline(tableName); |
| } |
| |
| String dir = getDir("/testBulkFile-"); |
| |
| Map<String,Set<String>> hashes = new HashMap<>(); |
| for (String endRow : Arrays.asList("0333 0666 0999 1333 1666 null".split(" "))) { |
| hashes.put(endRow, new HashSet<>()); |
| } |
| |
| // Add a junk file, should be ignored |
| FSDataOutputStream out = fs.create(new Path(dir, "junk")); |
| out.writeChars("ABCDEFG\n"); |
| out.close(); |
| |
| // 1 Tablet 0333-null |
| String h1 = writeData(dir + "/f1.", aconf, 0, 333); |
| hashes.get("0333").add(h1); |
| |
| // 2 Tablets 0666-0334, 0999-0667 |
| String h2 = writeData(dir + "/f2.", aconf, 334, 999); |
| hashes.get("0666").add(h2); |
| hashes.get("0999").add(h2); |
| |
| // 2 Tablets 1333-1000, 1666-1334 |
| String h3 = writeData(dir + "/f3.", aconf, 1000, 1499); |
| hashes.get("1333").add(h3); |
| hashes.get("1666").add(h3); |
| |
| // 2 Tablets 1666-1334, >1666 |
| String h4 = writeData(dir + "/f4.", aconf, 1500, 1999); |
| hashes.get("1666").add(h4); |
| hashes.get("null").add(h4); |
| |
| if (usePlan) { |
| LoadPlan loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(333)) |
| .loadFileTo("f2.rf", RangeType.TABLE, row(333), row(999)) |
| .loadFileTo("f3.rf", RangeType.FILE, row(1000), row(1499)) |
| .loadFileTo("f4.rf", RangeType.FILE, row(1500), row(1999)).build(); |
| c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load(); |
| } else { |
| c.tableOperations().importDirectory(dir).to(tableName).load(); |
| } |
| |
| if (offline) { |
| c.tableOperations().online(tableName); |
| } |
| |
| verifyData(c, tableName, 0, 1999, false); |
| verifyMetadata(c, tableName, hashes); |
| } |
| } |
| |
| private void testBulkFileMax(boolean usePlan) throws Exception { |
| try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { |
| addSplits(c, tableName, "0333 0666 0999 1333 1666"); |
| |
| String dir = getDir("/testBulkFileMax-"); |
| |
| Map<String,Set<String>> hashes = new HashMap<>(); |
| for (String endRow : Arrays.asList("0333 0666 0999 1333 1666 null".split(" "))) { |
| hashes.put(endRow, new HashSet<>()); |
| } |
| |
| // Add a junk file, should be ignored |
| FSDataOutputStream out = fs.create(new Path(dir, "junk")); |
| out.writeChars("ABCDEFG\n"); |
| out.close(); |
| |
| // 1 Tablet 0333-null |
| String h1 = writeData(dir + "/f1.", aconf, 0, 333); |
| hashes.get("0333").add(h1); |
| |
| // 3 Tablets 0666-0334, 0999-0667, 1333-1000 |
| String h2 = writeData(dir + "/bad-file.", aconf, 334, 1333); |
| hashes.get("0666").add(h2); |
| hashes.get("0999").add(h2); |
| hashes.get("1333").add(h2); |
| |
| // 1 Tablet 1666-1334 |
| String h3 = writeData(dir + "/f3.", aconf, 1334, 1499); |
| hashes.get("1666").add(h3); |
| |
| // 2 Tablets 1666-1334, >1666 |
| String h4 = writeData(dir + "/f4.", aconf, 1500, 1999); |
| hashes.get("1666").add(h4); |
| hashes.get("null").add(h4); |
| |
| if (usePlan) { |
| LoadPlan loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(333)) |
| .loadFileTo("bad-file.rf", RangeType.TABLE, row(333), row(1333)) |
| .loadFileTo("f3.rf", RangeType.FILE, row(1334), row(1499)) |
| .loadFileTo("f4.rf", RangeType.FILE, row(1500), row(1999)).build(); |
| c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load(); |
| } else { |
| c.tableOperations().importDirectory(dir).to(tableName).load(); |
| } |
| |
| verifyData(c, tableName, 0, 1999, false); |
| verifyMetadata(c, tableName, hashes); |
| } |
| } |
| |
| @Test |
| public void testBulkFile() throws Exception { |
| testBulkFile(false, false); |
| } |
| |
| @Test |
| public void testBulkFileOffline() throws Exception { |
| testBulkFile(true, false); |
| } |
| |
| @Test |
| public void testLoadPlan() throws Exception { |
| testBulkFile(false, true); |
| } |
| |
| @Test |
| public void testLoadPlanOffline() throws Exception { |
| testBulkFile(true, true); |
| } |
| |
| @Test |
| public void testBadLoadPlans() throws Exception { |
| try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { |
| addSplits(c, tableName, "0333 0666 0999 1333 1666"); |
| |
| String dir = getDir("/testBulkFile-"); |
| |
| writeData(dir + "/f1.", aconf, 0, 333); |
| writeData(dir + "/f2.", aconf, 0, 666); |
| |
| // Create a plan with more files than exists in dir |
| LoadPlan loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(333)) |
| .loadFileTo("f2.rf", RangeType.TABLE, null, row(666)) |
| .loadFileTo("f3.rf", RangeType.TABLE, null, row(666)).build(); |
| try { |
| c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load(); |
| fail(); |
| } catch (IllegalArgumentException e) { |
| // ignore |
| } |
| |
| // Create a plan with less files than exists in dir |
| loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(333)).build(); |
| try { |
| c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load(); |
| fail(); |
| } catch (IllegalArgumentException e) { |
| // ignore |
| } |
| |
| // Create a plan with tablet boundary that does not exits |
| loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(555)) |
| .loadFileTo("f2.rf", RangeType.TABLE, null, row(555)).build(); |
| try { |
| c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load(); |
| fail(); |
| } catch (AccumuloException e) { |
| // ignore |
| } |
| } |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testEmptyDir() throws Exception { |
| try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { |
| String dir = getDir("/testBulkFile-"); |
| FileSystem fs = getCluster().getFileSystem(); |
| fs.mkdirs(new Path(dir)); |
| c.tableOperations().importDirectory(dir).to(tableName).load(); |
| } |
| } |
| |
| private void addSplits(AccumuloClient client, String tableName, String splitString) |
| throws Exception { |
| SortedSet<Text> splits = new TreeSet<>(); |
| for (String split : splitString.split(" ")) { |
| splits.add(new Text(split)); |
| } |
| client.tableOperations().addSplits(tableName, splits); |
| } |
| |
| private void verifyData(AccumuloClient client, String table, int start, int end, boolean setTime) |
| throws Exception { |
| try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { |
| |
| Iterator<Entry<Key,Value>> iter = scanner.iterator(); |
| |
| for (int i = start; i <= end; i++) { |
| if (!iter.hasNext()) { |
| throw new Exception("row " + i + " not found"); |
| } |
| |
| Entry<Key,Value> entry = iter.next(); |
| |
| String row = String.format("%04d", i); |
| |
| if (!entry.getKey().getRow().equals(new Text(row))) { |
| throw new Exception("unexpected row " + entry.getKey() + " " + i); |
| } |
| |
| if (Integer.parseInt(entry.getValue().toString()) != i) { |
| throw new Exception("unexpected value " + entry + " " + i); |
| } |
| |
| if (setTime) { |
| assertEquals(1L, entry.getKey().getTimestamp()); |
| } |
| } |
| |
| if (iter.hasNext()) { |
| throw new Exception("found more than expected " + iter.next()); |
| } |
| } |
| } |
| |
| private void verifyMetadata(AccumuloClient client, String tableName, |
| Map<String,Set<String>> expectedHashes) { |
| |
| Set<String> endRowsSeen = new HashSet<>(); |
| |
| String id = client.tableOperations().tableIdMap().get(tableName); |
| try (TabletsMetadata tablets = TabletsMetadata.builder(client).forTable(TableId.of(id)) |
| .fetch(FILES, LOADED, PREV_ROW).build()) { |
| for (TabletMetadata tablet : tablets) { |
| assertTrue(tablet.getLoaded().isEmpty()); |
| |
| Set<String> fileHashes = tablet.getFiles().stream().map(f -> hash(f.getMetaUpdateDelete())) |
| .collect(Collectors.toSet()); |
| |
| String endRow = tablet.getEndRow() == null ? "null" : tablet.getEndRow().toString(); |
| |
| assertEquals(expectedHashes.get(endRow), fileHashes); |
| |
| endRowsSeen.add(endRow); |
| } |
| |
| assertEquals(expectedHashes.keySet(), endRowsSeen); |
| } |
| } |
| |
| @SuppressFBWarnings(value = {"PATH_TRAVERSAL_IN", "WEAK_MESSAGE_DIGEST_SHA1"}, |
| justification = "path provided by test; sha-1 is okay for test") |
| private String hash(String filename) { |
| try { |
| byte[] data = Files.readAllBytes(Paths.get(filename.replaceFirst("^file:", ""))); |
| byte[] hash = MessageDigest.getInstance("SHA1").digest(data); |
| return new BigInteger(1, hash).toString(16); |
| } catch (IOException | NoSuchAlgorithmException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private static String row(int r) { |
| return String.format("%04d", r); |
| } |
| |
| private String writeData(String file, AccumuloConfiguration aconf, int s, int e) |
| throws Exception { |
| FileSystem fs = getCluster().getFileSystem(); |
| String filename = file + RFile.EXTENSION; |
| try (FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder() |
| .forFile(filename, fs, fs.getConf(), CryptoServiceFactory.newDefaultInstance()) |
| .withTableConfiguration(aconf).build()) { |
| writer.startDefaultLocalityGroup(); |
| for (int i = s; i <= e; i++) { |
| writer.append(new Key(new Text(row(i))), new Value(Integer.toString(i))); |
| } |
| } |
| |
| return hash(filename); |
| } |
| } |