blob: 1916e57fdeba1c352b4d211ff4cf90254a180cab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.testing.randomwalk.shard;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.testing.randomwalk.RandWalkEnv;
import org.apache.accumulo.testing.randomwalk.State;
import org.apache.accumulo.testing.randomwalk.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ToolRunner;
public class BulkInsert extends Test {
class SeqfileBatchWriter implements BatchWriter {
SequenceFile.Writer writer;
SeqfileBatchWriter(Configuration conf, FileSystem fs, String file) throws IOException {
writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(fs.makeQualified(new Path(file))),
SequenceFile.Writer.keyClass(Key.class), SequenceFile.Writer.valueClass(Value.class));
}
@Override
public void addMutation(Mutation m) throws MutationsRejectedException {
List<ColumnUpdate> updates = m.getUpdates();
for (ColumnUpdate cu : updates) {
Key key = new Key(m.getRow(), cu.getColumnFamily(), cu.getColumnQualifier(),
cu.getColumnVisibility(), Long.MAX_VALUE, false, false);
Value val = new Value(cu.getValue(), false);
try {
writer.append(key, val);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void addMutations(Iterable<Mutation> iterable) throws MutationsRejectedException {
for (Mutation mutation : iterable)
addMutation(mutation);
}
@Override
public void flush() throws MutationsRejectedException {}
@Override
public void close() throws MutationsRejectedException {
try {
writer.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void visit(State state, RandWalkEnv env, Properties props) throws Exception {
String indexTableName = (String) state.get("indexTableName");
String dataTableName = (String) state.get("docTableName");
int numPartitions = (Integer) state.get("numPartitions");
Random rand = state.getRandom();
long nextDocID = (Long) state.get("nextDocID");
int minInsert = Integer.parseInt(props.getProperty("minInsert"));
int maxInsert = Integer.parseInt(props.getProperty("maxInsert"));
int numToInsert = rand.nextInt(maxInsert - minInsert) + minInsert;
int maxSplits = Integer.parseInt(props.getProperty("maxSplits"));
Configuration conf = env.getHadoopConfiguration();
FileSystem fs = FileSystem.get(conf);
String rootDir = "/tmp/shard_bulk/" + dataTableName;
fs.mkdirs(new Path(rootDir));
BatchWriter dataWriter = new SeqfileBatchWriter(conf, fs, rootDir + "/data.seq");
BatchWriter indexWriter = new SeqfileBatchWriter(conf, fs, rootDir + "/index.seq");
for (int i = 0; i < numToInsert; i++) {
String docID = Insert.insertRandomDocument(nextDocID++, dataWriter, indexWriter,
indexTableName, dataTableName, numPartitions, rand);
log.debug("Bulk inserting document " + docID);
}
state.set("nextDocID", Long.valueOf(nextDocID));
dataWriter.close();
indexWriter.close();
sort(state, env, fs, dataTableName, rootDir + "/data.seq", rootDir + "/data_bulk",
rootDir + "/data_work", maxSplits);
sort(state, env, fs, indexTableName, rootDir + "/index.seq", rootDir + "/index_bulk",
rootDir + "/index_work", maxSplits);
bulkImport(fs, state, env, dataTableName, rootDir, "data");
bulkImport(fs, state, env, indexTableName, rootDir, "index");
fs.delete(new Path(rootDir), true);
}
@SuppressWarnings("deprecation")
private void bulkImport(FileSystem fs, State state, RandWalkEnv env, String tableName,
String rootDir, String prefix) throws Exception {
while (true) {
String bulkDir = rootDir + "/" + prefix + "_bulk";
String failDir = rootDir + "/" + prefix + "_failure";
Path failPath = new Path(failDir);
fs.delete(failPath, true);
fs.mkdirs(failPath);
env.getAccumuloClient().tableOperations().importDirectory(tableName, bulkDir, failDir, true);
FileStatus[] failures = fs.listStatus(failPath);
if (failures != null && failures.length > 0) {
log.warn("Failed to bulk import some files, retrying ");
for (FileStatus failure : failures) {
if (!failure.getPath().getName().endsWith(".seq"))
fs.rename(failure.getPath(), new Path(new Path(bulkDir), failure.getPath().getName()));
else
log.debug("Ignoring " + failure.getPath());
}
sleepUninterruptibly(3, TimeUnit.SECONDS);
} else
break;
}
}
private void sort(State state, RandWalkEnv env, FileSystem fs, String tableName, String seqFile,
String outputDir, String workDir, int maxSplits) throws Exception {
PrintStream out = new PrintStream(
new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))), false,
UTF_8.name());
AccumuloClient client = env.getAccumuloClient();
Collection<Text> splits = client.tableOperations().listSplits(tableName, maxSplits);
for (Text split : splits)
out.println(Base64.getEncoder().encodeToString(split.copyBytes()));
out.close();
SortTool sortTool = new SortTool(seqFile, outputDir, workDir + "/splits.txt", splits);
if (ToolRunner.run(env.getHadoopConfiguration(), sortTool, new String[0]) != 0) {
throw new Exception("Failed to run map/red verify");
}
}
}