blob: 2d37f7518c11a29481542674692d9c204d012b65 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.compaction;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Stream;
import org.apache.accumulo.cluster.AccumuloCluster;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.TestFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import com.beust.jcommander.internal.Maps;
public class ExternalCompactionTestUtils {
public static final int MAX_DATA = 1000;
public static final String QUEUE1 = "DCQ1";
public static final String QUEUE2 = "DCQ2";
public static final String QUEUE3 = "DCQ3";
public static final String QUEUE4 = "DCQ4";
public static final String QUEUE5 = "DCQ5";
public static final String QUEUE6 = "DCQ6";
public static final String QUEUE7 = "DCQ7";
public static final String QUEUE8 = "DCQ8";
public static String row(int r) {
return String.format("r:%04d", r);
}
public static Stream<ExternalCompactionFinalState> getFinalStatesForTable(AccumuloCluster cluster,
TableId tid) {
return cluster.getServerContext().getAmple().getExternalCompactionFinalStates()
.filter(state -> state.getExtent().tableId().equals(tid));
}
public static void compact(final AccumuloClient client, String table1, int modulus,
String expectedQueue, boolean wait)
throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class);
// make sure iterator options make it to compactor process
iterSetting.addOption("expectedQ", expectedQueue);
iterSetting.addOption("modulus", modulus + "");
CompactionConfig config =
new CompactionConfig().setIterators(List.of(iterSetting)).setWait(wait);
client.tableOperations().compact(table1, config);
}
public static void createTable(AccumuloClient client, String tableName, String service)
throws Exception {
Map<String,String> props =
Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(),
"table.compaction.dispatcher.opts.service", service);
NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props);
client.tableOperations().create(tableName, ntc);
}
public static void createTable(AccumuloClient client, String tableName, String service,
int numTablets) throws Exception {
SortedSet<Text> splits = new TreeSet<>();
int jump = MAX_DATA / numTablets;
for (int r = jump; r < MAX_DATA; r += jump) {
splits.add(new Text(row(r)));
}
createTable(client, tableName, service, splits);
}
public static void createTable(AccumuloClient client, String tableName, String service,
SortedSet<Text> splits) throws Exception {
Map<String,String> props =
Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(),
"table.compaction.dispatcher.opts.service", service);
NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props).withSplits(splits);
client.tableOperations().create(tableName, ntc);
}
public static void writeData(AccumuloClient client, String table1, int rows)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
try (BatchWriter bw = client.createBatchWriter(table1)) {
for (int i = 0; i < rows; i++) {
Mutation m = new Mutation(row(i));
m.put("", "", "" + i);
bw.addMutation(m);
}
}
client.tableOperations().flush(table1);
}
public static void writeData(AccumuloClient client, String table1)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
writeData(client, table1, MAX_DATA);
}
public static void verify(AccumuloClient client, String table1, int modulus)
throws TableNotFoundException, AccumuloSecurityException, AccumuloException {
verify(client, table1, modulus, MAX_DATA);
}
public static void verify(AccumuloClient client, String table1, int modulus, int rows)
throws TableNotFoundException, AccumuloSecurityException, AccumuloException {
try (Scanner scanner = client.createScanner(table1)) {
int count = 0;
for (Entry<Key,Value> entry : scanner) {
assertEquals(0, Integer.parseInt(entry.getValue().toString()) % modulus,
String.format("%s %s %d != 0", entry.getValue(), "%", modulus));
count++;
}
int expectedCount = 0;
for (int i = 0; i < rows; i++) {
if (i % modulus == 0) {
expectedCount++;
}
}
assertEquals(expectedCount, count);
}
}
public static void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
// ecomp writes from the TabletServer are not being written to the metadata
// table, they are being queued up instead.
Map<String,String> clProps = Maps.newHashMap();
clProps.put(ClientProperty.BATCH_WRITER_LATENCY_MAX.getKey(), "2s");
cfg.setClientProps(clProps);
cfg.setProperty("tserver.compaction.major.service.cs1.planner",
DefaultCompactionPlanner.class.getName());
cfg.setProperty("tserver.compaction.major.service.cs1.planner.opts.executors",
"[{'name':'all', 'type': 'external', 'queue': '" + QUEUE1 + "'}]");
cfg.setProperty("tserver.compaction.major.service.cs2.planner",
DefaultCompactionPlanner.class.getName());
cfg.setProperty("tserver.compaction.major.service.cs2.planner.opts.executors",
"[{'name':'all', 'type': 'external','queue': '" + QUEUE2 + "'}]");
cfg.setProperty("tserver.compaction.major.service.cs3.planner",
DefaultCompactionPlanner.class.getName());
cfg.setProperty("tserver.compaction.major.service.cs3.planner.opts.executors",
"[{'name':'all', 'type': 'external','queue': '" + QUEUE3 + "'}]");
cfg.setProperty("tserver.compaction.major.service.cs4.planner",
DefaultCompactionPlanner.class.getName());
cfg.setProperty("tserver.compaction.major.service.cs4.planner.opts.executors",
"[{'name':'all', 'type': 'external','queue': '" + QUEUE4 + "'}]");
cfg.setProperty("tserver.compaction.major.service.cs5.planner",
DefaultCompactionPlanner.class.getName());
cfg.setProperty("tserver.compaction.major.service.cs5.planner.opts.executors",
"[{'name':'all', 'type': 'external','queue': '" + QUEUE5 + "'}]");
cfg.setProperty("tserver.compaction.major.service.cs6.planner",
DefaultCompactionPlanner.class.getName());
cfg.setProperty("tserver.compaction.major.service.cs6.planner.opts.executors",
"[{'name':'all', 'type': 'external','queue': '" + QUEUE6 + "'}]");
cfg.setProperty("tserver.compaction.major.service.cs7.planner",
DefaultCompactionPlanner.class.getName());
cfg.setProperty("tserver.compaction.major.service.cs7.planner.opts.executors",
"[{'name':'all', 'type': 'external','queue': '" + QUEUE7 + "'}]");
cfg.setProperty("tserver.compaction.major.service.cs8.planner",
DefaultCompactionPlanner.class.getName());
cfg.setProperty("tserver.compaction.major.service.cs8.planner.opts.executors",
"[{'name':'all', 'type': 'external','queue': '" + QUEUE8 + "'}]");
cfg.setProperty(Property.COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL, "5s");
cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, "5s");
cfg.setProperty(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL, "3s");
cfg.setProperty(Property.COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH, "true");
cfg.setProperty(Property.COMPACTOR_PORTSEARCH, "true");
cfg.setProperty(Property.GENERAL_THREADPOOL_SIZE, "10");
cfg.setProperty(Property.MANAGER_FATE_THREADPOOL_SIZE, "10");
// use raw local file system so walogs sync and flush will work
coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
public static TExternalCompactionList getRunningCompactions(ClientContext context)
throws TException {
Optional<HostAndPort> coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(context);
if (coordinatorHost.isEmpty()) {
throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
}
CompactionCoordinatorService.Client client =
ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, coordinatorHost.orElseThrow(), context);
try {
TExternalCompactionList running =
client.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
return running;
} finally {
ThriftUtil.returnClient(client, context);
}
}
private static TExternalCompactionList getCompletedCompactions(ClientContext context)
throws Exception {
Optional<HostAndPort> coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(context);
if (coordinatorHost.isEmpty()) {
throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
}
CompactionCoordinatorService.Client client =
ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, coordinatorHost.orElseThrow(), context);
try {
TExternalCompactionList completed =
client.getCompletedCompactions(TraceUtil.traceInfo(), context.rpcCreds());
return completed;
} finally {
ThriftUtil.returnClient(client, context);
}
}
public static TCompactionState getLastState(TExternalCompaction status) {
ArrayList<Long> timestamps = new ArrayList<>(status.getUpdates().size());
status.getUpdates().keySet().forEach(k -> timestamps.add(k));
Collections.sort(timestamps);
return status.getUpdates().get(timestamps.get(timestamps.size() - 1)).getState();
}
public static Set<ExternalCompactionId> waitForCompactionStartAndReturnEcids(ServerContext ctx,
TableId tid) {
Set<ExternalCompactionId> ecids = new HashSet<>();
do {
try (TabletsMetadata tm =
ctx.getAmple().readTablets().forTable(tid).fetch(ColumnType.ECOMP).build()) {
tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()).forEach(ecids::add);
}
if (ecids.isEmpty()) {
UtilWaitThread.sleep(50);
}
} while (ecids.isEmpty());
return ecids;
}
public static int confirmCompactionRunning(ServerContext ctx, Set<ExternalCompactionId> ecids)
throws Exception {
int matches = 0;
while (matches == 0) {
TExternalCompactionList running = ExternalCompactionTestUtils.getRunningCompactions(ctx);
if (running.getCompactions() != null) {
for (ExternalCompactionId ecid : ecids) {
TExternalCompaction tec = running.getCompactions().get(ecid.canonical());
if (tec != null && tec.getUpdates() != null && !tec.getUpdates().isEmpty()) {
matches++;
assertEquals(TCompactionState.STARTED, ExternalCompactionTestUtils.getLastState(tec));
}
}
}
if (matches == 0) {
UtilWaitThread.sleep(50);
}
}
return matches;
}
public static void confirmCompactionCompleted(ServerContext ctx, Set<ExternalCompactionId> ecids,
TCompactionState expectedState) throws Exception {
// The running compaction should be removed
TExternalCompactionList running = ExternalCompactionTestUtils.getRunningCompactions(ctx);
while (running.getCompactions() != null) {
running = ExternalCompactionTestUtils.getRunningCompactions(ctx);
if (running.getCompactions() == null) {
UtilWaitThread.sleep(250);
}
}
// The compaction should be in the completed list with the expected state
TExternalCompactionList completed = ExternalCompactionTestUtils.getCompletedCompactions(ctx);
while (completed.getCompactions() == null) {
completed = ExternalCompactionTestUtils.getCompletedCompactions(ctx);
if (completed.getCompactions() == null) {
UtilWaitThread.sleep(50);
}
}
for (ExternalCompactionId e : ecids) {
TExternalCompaction tec = completed.getCompactions().get(e.canonical());
assertNotNull(tec);
assertEquals(expectedState, ExternalCompactionTestUtils.getLastState(tec));
}
}
}