blob: 4aca716159d06946bfe82370f7d38573e10d9c84 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.functional;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.MemoryUnit;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.Test;
/**
* Tests old bulk import technique. For new bulk import see {@link BulkNewIT}
*/
public class BulkOldIT extends AccumuloClusterHarness {
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) {
cfg.setMemory(ServerType.TABLET_SERVER, 512, MemoryUnit.MEGABYTE);
}
@Override
protected int defaultTimeoutSeconds() {
return 4 * 60;
}
// suppress importDirectory deprecated since this is the only test for legacy technique
@SuppressWarnings("deprecation")
@Test
public void testBulkFile() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
SortedSet<Text> splits = new TreeSet<>();
for (String split : "0333 0666 0999 1333 1666".split(" "))
splits.add(new Text(split));
NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits);
c.tableOperations().create(tableName, ntc);
Configuration conf = new Configuration();
AccumuloConfiguration aconf = getCluster().getServerContext().getConfiguration();
FileSystem fs = getCluster().getFileSystem();
String rootPath = cluster.getTemporaryPath().toString();
String dir = rootPath + "/bulk_test_diff_files_89723987592_" + getUniqueNames(1)[0];
fs.delete(new Path(dir), true);
writeData(conf, aconf, fs, dir, "f1", 0, 333);
writeData(conf, aconf, fs, dir, "f2", 334, 999);
writeData(conf, aconf, fs, dir, "f3", 1000, 1999);
String failDir = dir + "_failures";
Path failPath = new Path(failDir);
fs.delete(failPath, true);
fs.mkdirs(failPath);
fs.deleteOnExit(failPath);
// Ensure server can read/modify files
c.tableOperations().importDirectory(tableName, dir, failDir, false);
if (fs.listStatus(failPath).length > 0) {
throw new Exception("Some files failed to bulk import");
}
FunctionalTestUtils.checkRFiles(c, tableName, 6, 6, 1, 1);
verifyData(c, tableName, 0, 1999);
}
}
private void writeData(Configuration conf, AccumuloConfiguration aconf, FileSystem fs, String dir,
String file, int start, int end) throws IOException, Exception {
FileSKVWriter writer1 =
FileOperations
.getInstance().newWriterBuilder().forFile(dir + "/" + file + "." + RFile.EXTENSION, fs,
conf, CryptoServiceFactory.newDefaultInstance())
.withTableConfiguration(aconf).build();
writer1.startDefaultLocalityGroup();
for (int i = start; i <= end; i++) {
writer1.append(new Key(new Text(String.format("%04d", i))), new Value(Integer.toString(i)));
}
writer1.close();
}
private void verifyData(AccumuloClient client, String table, int s, int e) throws Exception {
try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
Iterator<Entry<Key,Value>> iter = scanner.iterator();
for (int i = s; i <= e; i++) {
if (!iter.hasNext())
throw new Exception("row " + i + " not found");
Entry<Key,Value> entry = iter.next();
String row = String.format("%04d", i);
if (!entry.getKey().getRow().equals(new Text(row)))
throw new Exception("unexpected row " + entry.getKey() + " " + i);
if (Integer.parseInt(entry.getValue().toString()) != i)
throw new Exception("unexpected value " + entry + " " + i);
}
if (iter.hasNext())
throw new Exception("found more than expected " + iter.next());
}
}
}