blob: 281b18345329ba8ddf049ffc31e19f65d281c87a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.testing.continuous;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyValue;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.Mutation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
/**
* BatchWriter that bulk imports in its implementation. The implementation contains a bug that was
* found to be useful for testing Accumulo. The bug was left and this class was renamed to add Flaky
* to indicate its danger for other uses.
*/
public class FlakyBulkBatchWriter implements BatchWriter {
private static final Logger log = LoggerFactory.getLogger(FlakyBulkBatchWriter.class);
private final Deque<Mutation> mutations = new ArrayDeque<>();
private final AccumuloClient client;
private final String tableName;
private final FileSystem fileSystem;
private final Path workPath;
private final long memLimit;
private final Supplier<SortedSet<Text>> splitSupplier;
private long memUsed;
private boolean closed = false;
public FlakyBulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileSystem,
Path workPath, long memLimit, Supplier<SortedSet<Text>> splitSupplier) {
this.client = client;
this.tableName = tableName;
this.fileSystem = fileSystem;
this.workPath = workPath;
this.memLimit = memLimit;
this.splitSupplier = splitSupplier;
}
@Override
public synchronized void addMutation(Mutation mutation) throws MutationsRejectedException {
Preconditions.checkState(!closed);
mutation = new Mutation(mutation);
mutations.addLast(mutation);
memUsed += mutation.estimatedMemoryUsed();
if (memUsed > memLimit) {
flush();
}
}
@Override
public synchronized void addMutations(Iterable<Mutation> iterable)
throws MutationsRejectedException {
for (var mutation : iterable) {
addMutation(mutation);
}
}
@Override
public synchronized void flush() throws MutationsRejectedException {
Preconditions.checkState(!closed);
try {
var splits = splitSupplier.get();
Path tmpDir = new Path(workPath, UUID.randomUUID().toString());
fileSystem.mkdirs(tmpDir);
List<KeyValue> keysValues = new ArrayList<>(mutations.size());
// remove mutations from the dequeue as we convert them to Keys making the Mutation objects
// available for garbage collection
Mutation mutation;
while ((mutation = mutations.pollFirst()) != null) {
for (var columnUpdate : mutation.getUpdates()) {
var builder = Key.builder(false).row(mutation.getRow())
.family(columnUpdate.getColumnFamily()).qualifier(columnUpdate.getColumnQualifier())
.visibility(columnUpdate.getColumnVisibility());
if (columnUpdate.hasTimestamp()) {
builder = builder.timestamp(columnUpdate.getTimestamp());
}
Key key = builder.deleted(columnUpdate.isDeleted()).build();
keysValues.add(new KeyValue(key, columnUpdate.getValue()));
}
}
keysValues.sort(Map.Entry.comparingByKey());
RFileWriter writer = null;
byte[] currEndRow = null;
int nextFileNameCounter = 0;
var loadPlanBuilder = LoadPlan.builder();
// This code is broken because Arrays.compare will compare bytes as signed integers. Accumulo
// treats bytes as unsigned 8 bit integers for sorting purposes. This incorrect comparator
// causes this code to sometimes prematurely close rfiles, which can lead to lots of files
// being bulk imported into a single tablet. The files still go to the correct tablet, so this
// does not cause data loss. This bug was found to be useful in testing as it introduces
// stress on bulk import+compactions and it was decided to keep this bug. If copying this code
// elsewhere then this bug should probably be fixed.
Comparator<byte[]> comparator = Arrays::compare;
// To fix the code above it should be replaced with the following
// Comparator<byte[]> comparator = UnsignedBytes.lexicographicalComparator();
for (var keyValue : keysValues) {
var key = keyValue.getKey();
if (writer == null || (currEndRow != null
&& comparator.compare(key.getRowData().toArray(), currEndRow) > 0)) {
if (writer != null) {
writer.close();
}
// When the above code prematurely closes a rfile because of the incorrect comparator, the
// following code will find a new Tablet. Since the following code uses the Text
// comparator its comparisons are correct and it will just find the same tablet for the
// file that was just closed. This is what cause multiple files to added to the same
// tablet.
var row = key.getRow();
var headSet = splits.headSet(row);
var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();
var tailSet = splits.tailSet(row);
var tabletEndRow = tailSet.isEmpty() ? null : tailSet.first();
currEndRow = tabletEndRow == null ? null : tabletEndRow.copyBytes();
String filename = String.format("bbw-%05d.rf", nextFileNameCounter++);
writer = RFile.newWriter().to(tmpDir + "/" + filename).withFileSystem(fileSystem).build();
loadPlanBuilder = loadPlanBuilder.loadFileTo(filename, LoadPlan.RangeType.TABLE,
tabletPrevRow, tabletEndRow);
log.debug("Created new file {} for range {} {}", filename, tabletPrevRow, tabletEndRow);
}
writer.append(key, keyValue.getValue());
}
if (writer != null) {
writer.close();
}
// TODO make table time configurable?
var loadPlan = loadPlanBuilder.build();
long t1 = System.nanoTime();
client.tableOperations().importDirectory(tmpDir.toString()).to(tableName).plan(loadPlan)
.tableTime(true).load();
long t2 = System.nanoTime();
log.debug("Bulk imported dir {} destinations:{} mutations:{} memUsed:{} time:{}ms", tmpDir,
loadPlan.getDestinations().size(), mutations.size(), memUsed,
TimeUnit.NANOSECONDS.toMillis(t2 - t1));
fileSystem.delete(tmpDir, true);
mutations.clear();
memUsed = 0;
} catch (Exception e) {
closed = true;
throw new MutationsRejectedException(client, List.of(), Map.of(), List.of(), 1, e);
}
}
@Override
public synchronized void close() throws MutationsRejectedException {
flush();
closed = true;
}
}