blob: eef0bcd171aa06bba69cda15ad721d67a56eddeb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.util;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.accumulo.core.cli.ClientOpts;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.hadoop.io.Text;
import org.apache.htrace.TraceScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
public class Merge {
public static class MergeException extends Exception {
private static final long serialVersionUID = 1L;
MergeException(Exception ex) {
super(ex);
}
}
private static final Logger log = LoggerFactory.getLogger(Merge.class);
protected void message(String format, Object... args) {
log.info(String.format(format, args));
}
public static class MemoryConverter implements IStringConverter<Long> {
@Override
public Long convert(String value) {
return ConfigurationTypeHelper.getFixedMemoryAsBytes(value);
}
}
static class TextConverter implements IStringConverter<Text> {
@Override
public Text convert(String value) {
return new Text(value);
}
}
static class Opts extends ClientOpts {
@Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
String tableName;
@Parameter(names = {"-s", "--size"}, description = "merge goal size",
converter = MemoryConverter.class)
Long goalSize = null;
@Parameter(names = {"-f", "--force"},
description = "merge small tablets even if merging them to larger"
+ " tablets might cause a split")
boolean force = false;
@Parameter(names = {"-b", "--begin"}, description = "start tablet",
converter = TextConverter.class)
Text begin = null;
@Parameter(names = {"-e", "--end"}, description = "end tablet", converter = TextConverter.class)
Text end = null;
}
public void start(String[] args) throws MergeException {
Opts opts = new Opts();
try (TraceScope clientTrace = opts.parseArgsAndTrace(Merge.class.getName(), args)) {
try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
if (!client.tableOperations().exists(opts.tableName)) {
System.err.println("table " + opts.tableName + " does not exist");
return;
}
if (opts.goalSize == null || opts.goalSize < 1) {
AccumuloConfiguration tableConfig =
new ConfigurationCopy(client.tableOperations().getProperties(opts.tableName));
opts.goalSize = tableConfig.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
}
message("Merging tablets in table %s to %d bytes", opts.tableName, opts.goalSize);
mergomatic(client, opts.tableName, opts.begin, opts.end, opts.goalSize, opts.force);
} catch (Exception ex) {
throw new MergeException(ex);
}
}
}
public static void main(String[] args) throws MergeException {
Merge merge = new Merge();
merge.start(args);
}
public static class Size {
public Size(KeyExtent extent, long size) {
this.extent = extent;
this.size = size;
}
KeyExtent extent;
long size;
}
public void mergomatic(AccumuloClient client, String table, Text start, Text end, long goalSize,
boolean force) throws MergeException {
try {
if (table.equals(MetadataTable.NAME)) {
throw new IllegalArgumentException("cannot merge tablets on the metadata table");
}
List<Size> sizes = new ArrayList<>();
long totalSize = 0;
// Merge any until you get larger than the goal size, and then merge one less tablet
Iterator<Size> sizeIterator = getSizeIterator(client, table, start, end);
while (sizeIterator.hasNext()) {
Size next = sizeIterator.next();
totalSize += next.size;
sizes.add(next);
if (totalSize > goalSize) {
totalSize = mergeMany(client, table, sizes, goalSize, force, false);
}
}
if (sizes.size() > 1)
mergeMany(client, table, sizes, goalSize, force, true);
} catch (Exception ex) {
throw new MergeException(ex);
}
}
protected long mergeMany(AccumuloClient client, String table, List<Size> sizes, long goalSize,
boolean force, boolean last) throws MergeException {
// skip the big tablets, which will be the typical case
while (!sizes.isEmpty()) {
if (sizes.get(0).size < goalSize)
break;
sizes.remove(0);
}
if (sizes.isEmpty()) {
return 0;
}
// collect any small ones
long mergeSize = 0;
int numToMerge = 0;
for (int i = 0; i < sizes.size(); i++) {
if (mergeSize + sizes.get(i).size > goalSize) {
numToMerge = i;
break;
}
mergeSize += sizes.get(i).size;
}
if (numToMerge > 1) {
mergeSome(client, table, sizes, numToMerge);
} else {
if (numToMerge == 1 && sizes.size() > 1) {
// here we have the case of a merge candidate that is surrounded by candidates that would
// split
if (force) {
mergeSome(client, table, sizes, 2);
} else {
sizes.remove(0);
}
}
}
if (numToMerge == 0 && sizes.size() > 1 && last) {
// That's the last tablet, and we have a bunch to merge
mergeSome(client, table, sizes, sizes.size());
}
long result = 0;
for (Size s : sizes) {
result += s.size;
}
return result;
}
protected void mergeSome(AccumuloClient client, String table, List<Size> sizes, int numToMerge)
throws MergeException {
merge(client, table, sizes, numToMerge);
for (int i = 0; i < numToMerge; i++) {
sizes.remove(0);
}
}
protected void merge(AccumuloClient client, String table, List<Size> sizes, int numToMerge)
throws MergeException {
try {
Text start = sizes.get(0).extent.prevEndRow();
Text end = sizes.get(numToMerge - 1).extent.endRow();
message("Merging %d tablets from (%s to %s]", numToMerge, start == null ? "-inf" : start,
end == null ? "+inf" : end);
client.tableOperations().merge(table, start, end);
} catch (Exception ex) {
throw new MergeException(ex);
}
}
protected Iterator<Size> getSizeIterator(AccumuloClient client, String tablename, Text start,
Text end) throws MergeException {
// open up metadata, walk through the tablets.
TableId tableId;
TabletsMetadata tablets;
try {
ClientContext context = (ClientContext) client;
tableId = Tables.getTableId(context, tablename);
tablets = TabletsMetadata.builder().scanMetadataTable()
.overRange(new KeyExtent(tableId, end, start).toMetaRange()).fetch(FILES, PREV_ROW)
.build(context);
} catch (Exception e) {
throw new MergeException(e);
}
return tablets.stream().map(tm -> {
long size = tm.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum();
return new Size(tm.getExtent(), size);
}).iterator();
}
}