blob: 1a97e3df927c295db6e20e34fd15e5f26aac1ddf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.coordinator;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.core.util.compaction.RunningCompaction;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.manager.LiveTServerSet;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.common.collect.Sets;
@RunWith(PowerMockRunner.class)
@PrepareForTest({CompactionCoordinator.class, DeadCompactionDetector.class, ThriftUtil.class,
ExternalCompactionUtil.class})
@SuppressStaticInitializationFor({"org.apache.log4j.LogManager"})
@PowerMockIgnore({"org.slf4j.*", "org.apache.logging.*", "org.apache.log4j.*",
"org.apache.commons.logging.*", "org.xml.*", "javax.xml.*", "org.w3c.dom.*",
"com.sun.org.apache.xerces.*"})
public class CompactionCoordinatorTest {
public class TestCoordinator extends CompactionCoordinator {
private final ServerContext context;
private final ServerAddress client;
private final TabletClientService.Client tabletServerClient;
private Set<ExternalCompactionId> metadataCompactionIds = null;
protected TestCoordinator(CompactionFinalizer finalizer, LiveTServerSet tservers,
ServerAddress client, TabletClientService.Client tabletServerClient, ServerContext context,
AuditedSecurityOperation security) {
super(new ServerOpts(), new String[] {}, context.getConfiguration());
this.compactionFinalizer = finalizer;
this.tserverSet = tservers;
this.client = client;
this.tabletServerClient = tabletServerClient;
this.context = context;
this.security = security;
}
@Override
protected void startDeadCompactionDetector() {}
@Override
protected long getTServerCheckInterval() {
this.shutdown = true;
return 0L;
}
@Override
protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {}
@Override
protected CompactionFinalizer createCompactionFinalizer(ScheduledThreadPoolExecutor stpe) {
return null;
}
@Override
protected LiveTServerSet createLiveTServerSet() {
return null;
}
@Override
protected void setupSecurity() {}
@Override
protected void startGCLogger(ScheduledThreadPoolExecutor stpe) {}
@Override
protected void printStartupMsg() {}
@Override
public ServerContext getContext() {
return this.context;
}
@Override
protected void getCoordinatorLock(HostAndPort clientAddress)
throws KeeperException, InterruptedException {}
@Override
protected ServerAddress startCoordinatorClientService() throws UnknownHostException {
return client;
}
@Override
protected Client getTabletServerConnection(TServerInstance tserver) throws TTransportException {
return tabletServerClient;
}
@Override
public void compactionCompleted(TInfo tinfo, TCredentials credentials,
String externalCompactionId, TKeyExtent textent, TCompactionStats stats)
throws ThriftSecurityException {}
@Override
public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId,
TKeyExtent extent) throws ThriftSecurityException {}
void setMetadataCompactionIds(Set<ExternalCompactionId> mci) {
metadataCompactionIds = mci;
}
@Override
protected Set<ExternalCompactionId> readExternalCompactionIds() {
if (metadataCompactionIds == null) {
return RUNNING_CACHE.keySet();
} else {
return metadataCompactionIds;
}
}
public Map<String,TreeMap<Short,TreeSet<TServerInstance>>> getQueues() {
return CompactionCoordinator.QUEUE_SUMMARIES.QUEUES;
}
public Map<TServerInstance,Set<QueueAndPriority>> getIndex() {
return CompactionCoordinator.QUEUE_SUMMARIES.INDEX;
}
public Map<ExternalCompactionId,RunningCompaction> getRunning() {
return RUNNING_CACHE;
}
public void resetInternals() {
getQueues().clear();
getIndex().clear();
getRunning().clear();
metadataCompactionIds = null;
}
}
@Test
public void testCoordinatorWarningTime() {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
ServerContext context = PowerMock.createNiceMock(ServerContext.class);
SiteConfiguration aconf = SiteConfiguration.empty()
.withOverrides(Map.of(Property.COMPACTOR_MAX_JOB_WAIT_TIME.getKey(), "15s")).build();
ConfigurationCopy config = new ConfigurationCopy(aconf);
expect(context.getConfiguration()).andReturn(config).anyTimes();
PowerMock.replay(context);
var coordinator = new TestCoordinator(null, null, null, null, context, null);
// Should be equal to 3 * 15_000 milliseconds
assertEquals(45_000, coordinator.getMissingCompactorWarningTime());
coordinator.close();
}
@Test
public void testCoordinatorColdStartNoCompactions() throws Exception {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient"));
PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions",
"detectDanglingFinalStateMarkers"));
ServerContext context = PowerMock.createNiceMock(ServerContext.class);
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
MetricsInfo metricsInfo = PowerMock.createNiceMock(MetricsInfo.class);
expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes();
PowerMock.mockStatic(ExternalCompactionUtil.class);
List<RunningCompaction> runningCompactions = new ArrayList<>();
expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
.andReturn(runningCompactions);
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class);
LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);
expect(tservers.getCurrentServers()).andReturn(Collections.emptySet()).anyTimes();
ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
expect(client.getAddress()).andReturn(address).anyTimes();
TServerInstance tsi = PowerMock.createNiceMock(TServerInstance.class);
expect(tsi.getHostPort()).andReturn("localhost:9997").anyTimes();
TabletClientService.Client tsc = PowerMock.createNiceMock(TabletClientService.Client.class);
expect(tsc.getCompactionQueueInfo(anyObject(), anyObject())).andReturn(Collections.emptyList())
.anyTimes();
AuditedSecurityOperation security = PowerMock.createNiceMock(AuditedSecurityOperation.class);
PowerMock.replayAll();
var coordinator = new TestCoordinator(finalizer, tservers, client, tsc, context, security);
coordinator.resetInternals();
assertEquals(0, coordinator.getQueues().size());
assertEquals(0, coordinator.getIndex().size());
assertEquals(0, coordinator.getRunning().size());
coordinator.run();
assertEquals(0, coordinator.getQueues().size());
assertEquals(0, coordinator.getIndex().size());
assertEquals(0, coordinator.getRunning().size());
PowerMock.verifyAll();
coordinator.resetInternals();
coordinator.close();
}
@Test
public void testCoordinatorColdStart() throws Exception {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient"));
PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions",
"detectDanglingFinalStateMarkers"));
ServerContext context = PowerMock.createNiceMock(ServerContext.class);
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
MetricsInfo metricsInfo = PowerMock.createNiceMock(MetricsInfo.class);
expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes();
TCredentials creds = PowerMock.createNiceMock(TCredentials.class);
expect(context.rpcCreds()).andReturn(creds);
PowerMock.mockStatic(ExternalCompactionUtil.class);
List<RunningCompaction> runningCompactions = new ArrayList<>();
expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
.andReturn(runningCompactions);
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class);
LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);
TServerInstance instance = PowerMock.createNiceMock(TServerInstance.class);
expect(tservers.getCurrentServers()).andReturn(Collections.singleton(instance)).once();
ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
expect(client.getAddress()).andReturn(address).anyTimes();
TServerInstance tsi = PowerMock.createNiceMock(TServerInstance.class);
expect(tsi.getHostPort()).andReturn("localhost:9997").anyTimes();
TabletClientService.Client tsc = PowerMock.createNiceMock(TabletClientService.Client.class);
TCompactionQueueSummary queueSummary = PowerMock.createNiceMock(TCompactionQueueSummary.class);
expect(tsc.getCompactionQueueInfo(anyObject(), anyObject()))
.andReturn(Collections.singletonList(queueSummary)).anyTimes();
expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes();
expect(queueSummary.getPriority()).andReturn((short) 1).anyTimes();
AuditedSecurityOperation security = PowerMock.createNiceMock(AuditedSecurityOperation.class);
PowerMock.replayAll();
var coordinator = new TestCoordinator(finalizer, tservers, client, tsc, context, security);
coordinator.resetInternals();
assertEquals(0, coordinator.getQueues().size());
assertEquals(0, coordinator.getIndex().size());
assertEquals(0, coordinator.getRunning().size());
coordinator.run();
assertEquals(1, coordinator.getQueues().size());
QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), (short) 1);
Map<Short,TreeSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern());
assertNotNull(m);
assertEquals(1, m.size());
assertTrue(m.containsKey((short) 1));
Set<TServerInstance> t = m.get((short) 1);
assertNotNull(t);
assertEquals(1, t.size());
TServerInstance queuedTsi = t.iterator().next();
assertEquals(tsi.getHostPortSession(), queuedTsi.getHostPortSession());
assertEquals(1, coordinator.getIndex().size());
assertTrue(coordinator.getIndex().containsKey(queuedTsi));
Set<QueueAndPriority> i = coordinator.getIndex().get(queuedTsi);
assertEquals(1, i.size());
assertEquals(qp, i.iterator().next());
assertEquals(0, coordinator.getRunning().size());
PowerMock.verifyAll();
coordinator.resetInternals();
coordinator.close();
}
@Test
public void testCoordinatorRestartNoRunningCompactions() throws Exception {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient"));
PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions",
"detectDanglingFinalStateMarkers"));
ServerContext context = PowerMock.createNiceMock(ServerContext.class);
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
MetricsInfo metricsInfo = PowerMock.createNiceMock(MetricsInfo.class);
expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes();
TCredentials creds = PowerMock.createNiceMock(TCredentials.class);
expect(context.rpcCreds()).andReturn(creds);
CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class);
LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);
TServerInstance instance = PowerMock.createNiceMock(TServerInstance.class);
HostAndPort tserverAddress = HostAndPort.fromString("localhost:9997");
expect(instance.getHostAndPort()).andReturn(tserverAddress).anyTimes();
expect(tservers.getCurrentServers()).andReturn(Sets.newHashSet(instance)).once();
tservers.startListeningForTabletServerChanges();
PowerMock.mockStatic(ExternalCompactionUtil.class);
List<RunningCompaction> runningCompactions = new ArrayList<>();
expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
.andReturn(runningCompactions);
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
expect(client.getAddress()).andReturn(address).anyTimes();
expect(instance.getHostPort()).andReturn("localhost:9997").anyTimes();
TabletClientService.Client tsc = PowerMock.createNiceMock(TabletClientService.Client.class);
TCompactionQueueSummary queueSummary = PowerMock.createNiceMock(TCompactionQueueSummary.class);
expect(tsc.getCompactionQueueInfo(anyObject(), anyObject()))
.andReturn(Collections.singletonList(queueSummary)).anyTimes();
expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes();
expect(queueSummary.getPriority()).andReturn((short) 1).anyTimes();
AuditedSecurityOperation security = PowerMock.createNiceMock(AuditedSecurityOperation.class);
PowerMock.replayAll();
var coordinator = new TestCoordinator(finalizer, tservers, client, tsc, context, security);
coordinator.resetInternals();
assertEquals(0, coordinator.getQueues().size());
assertEquals(0, coordinator.getIndex().size());
assertEquals(0, coordinator.getRunning().size());
coordinator.run();
assertEquals(1, coordinator.getQueues().size());
QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), (short) 1);
Map<Short,TreeSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern());
assertNotNull(m);
assertEquals(1, m.size());
assertTrue(m.containsKey((short) 1));
Set<TServerInstance> t = m.get((short) 1);
assertNotNull(t);
assertEquals(1, t.size());
TServerInstance queuedTsi = t.iterator().next();
assertEquals(instance.getHostPortSession(), queuedTsi.getHostPortSession());
assertEquals(1, coordinator.getIndex().size());
assertTrue(coordinator.getIndex().containsKey(queuedTsi));
Set<QueueAndPriority> i = coordinator.getIndex().get(queuedTsi);
assertEquals(1, i.size());
assertEquals(qp, i.iterator().next());
assertEquals(0, coordinator.getRunning().size());
PowerMock.verifyAll();
coordinator.resetInternals();
coordinator.close();
}
@Test
public void testCoordinatorRestartOneRunningCompaction() throws Exception {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient"));
PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions",
"detectDanglingFinalStateMarkers"));
ServerContext context = PowerMock.createNiceMock(ServerContext.class);
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
MetricsInfo metricsInfo = PowerMock.createNiceMock(MetricsInfo.class);
expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes();
TCredentials creds = PowerMock.createNiceMock(TCredentials.class);
expect(context.rpcCreds()).andReturn(creds);
CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class);
LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);
TServerInstance instance = PowerMock.createNiceMock(TServerInstance.class);
HostAndPort tserverAddress = HostAndPort.fromString("localhost:9997");
expect(instance.getHostAndPort()).andReturn(tserverAddress).anyTimes();
expect(tservers.getCurrentServers()).andReturn(Sets.newHashSet(instance)).once();
tservers.startListeningForTabletServerChanges();
PowerMock.mockStatic(ExternalCompactionUtil.class);
List<RunningCompaction> runningCompactions = new ArrayList<>();
ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID());
TExternalCompactionJob job = PowerMock.createNiceMock(TExternalCompactionJob.class);
expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes();
TKeyExtent extent = new TKeyExtent();
extent.setTable("1".getBytes());
runningCompactions.add(new RunningCompaction(job, tserverAddress.toString(), "queue"));
expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
.andReturn(runningCompactions);
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
expect(client.getAddress()).andReturn(address).anyTimes();
expect(instance.getHostPort()).andReturn("localhost:9997").anyTimes();
TabletClientService.Client tsc = PowerMock.createNiceMock(TabletClientService.Client.class);
TCompactionQueueSummary queueSummary = PowerMock.createNiceMock(TCompactionQueueSummary.class);
expect(tsc.getCompactionQueueInfo(anyObject(), anyObject()))
.andReturn(Collections.singletonList(queueSummary)).anyTimes();
expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes();
expect(queueSummary.getPriority()).andReturn((short) 1).anyTimes();
AuditedSecurityOperation security = PowerMock.createNiceMock(AuditedSecurityOperation.class);
PowerMock.replayAll();
var coordinator = new TestCoordinator(finalizer, tservers, client, tsc, context, security);
coordinator.resetInternals();
assertEquals(0, coordinator.getQueues().size());
assertEquals(0, coordinator.getIndex().size());
assertEquals(0, coordinator.getRunning().size());
coordinator.run();
assertEquals(1, coordinator.getQueues().size());
QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), (short) 1);
Map<Short,TreeSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern());
assertNotNull(m);
assertEquals(1, m.size());
assertTrue(m.containsKey((short) 1));
Set<TServerInstance> t = m.get((short) 1);
assertNotNull(t);
assertEquals(1, t.size());
TServerInstance queuedTsi = t.iterator().next();
assertEquals(instance.getHostPortSession(), queuedTsi.getHostPortSession());
assertEquals(1, coordinator.getIndex().size());
assertTrue(coordinator.getIndex().containsKey(queuedTsi));
Set<QueueAndPriority> i = coordinator.getIndex().get(queuedTsi);
assertEquals(1, i.size());
assertEquals(qp, i.iterator().next());
assertEquals(1, coordinator.getRunning().size());
PowerMock.verifyAll();
coordinator.resetInternals();
coordinator.close();
}
@Test
public void testGetCompactionJob() throws Exception {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
PowerMock.suppress(PowerMock.methods(ThriftUtil.class, "returnClient"));
PowerMock.suppress(PowerMock.methods(DeadCompactionDetector.class, "detectDeadCompactions",
"detectDanglingFinalStateMarkers"));
ServerContext context = PowerMock.createNiceMock(ServerContext.class);
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
MetricsInfo metricsInfo = PowerMock.createNiceMock(MetricsInfo.class);
expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes();
TCredentials creds = PowerMock.createNiceMock(TCredentials.class);
expect(context.rpcCreds()).andReturn(creds).anyTimes();
PowerMock.mockStatic(ExternalCompactionUtil.class);
List<RunningCompaction> runningCompactions = new ArrayList<>();
expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
.andReturn(runningCompactions);
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class);
LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);
TServerInstance instance = PowerMock.createNiceMock(TServerInstance.class);
expect(tservers.getCurrentServers()).andReturn(Collections.singleton(instance)).once();
HostAndPort tserverAddress = HostAndPort.fromString("localhost:9997");
expect(instance.getHostAndPort()).andReturn(tserverAddress).anyTimes();
ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
expect(client.getAddress()).andReturn(address).anyTimes();
TServerInstance tsi = PowerMock.createNiceMock(TServerInstance.class);
expect(tsi.getHostPort()).andReturn("localhost:9997").anyTimes();
TabletClientService.Client tsc = PowerMock.createNiceMock(TabletClientService.Client.class);
TCompactionQueueSummary queueSummary = PowerMock.createNiceMock(TCompactionQueueSummary.class);
expect(tsc.getCompactionQueueInfo(anyObject(), anyObject()))
.andReturn(Collections.singletonList(queueSummary)).anyTimes();
expect(queueSummary.getQueue()).andReturn("R2DQ").anyTimes();
expect(queueSummary.getPriority()).andReturn((short) 1).anyTimes();
ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID());
TExternalCompactionJob job = PowerMock.createNiceMock(TExternalCompactionJob.class);
expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes();
TInfo trace = TraceUtil.traceInfo();
expect(tsc.reserveCompactionJob(trace, creds, "R2DQ", 1, "localhost:10241", eci.toString()))
.andReturn(job).anyTimes();
AuditedSecurityOperation security = PowerMock.createNiceMock(AuditedSecurityOperation.class);
expect(security.canPerformSystemActions(creds)).andReturn(true);
PowerMock.replayAll();
var coordinator = new TestCoordinator(finalizer, tservers, client, tsc, context, security);
coordinator.resetInternals();
assertEquals(0, coordinator.getQueues().size());
assertEquals(0, coordinator.getIndex().size());
assertEquals(0, coordinator.getRunning().size());
// Use coordinator.run() to populate the internal data structures. This is tested in a different
// test.
coordinator.run();
assertEquals(1, coordinator.getQueues().size());
QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), (short) 1);
Map<Short,TreeSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern());
assertNotNull(m);
assertEquals(1, m.size());
assertTrue(m.containsKey((short) 1));
Set<TServerInstance> t = m.get((short) 1);
assertNotNull(t);
assertEquals(1, t.size());
TServerInstance queuedTsi = t.iterator().next();
assertEquals(tsi.getHostPortSession(), queuedTsi.getHostPortSession());
assertEquals(1, coordinator.getIndex().size());
assertTrue(coordinator.getIndex().containsKey(queuedTsi));
Set<QueueAndPriority> i = coordinator.getIndex().get(queuedTsi);
assertEquals(1, i.size());
assertEquals(qp, i.iterator().next());
assertEquals(0, coordinator.getRunning().size());
// Get the next job
TExternalCompactionJob createdJob =
coordinator.getCompactionJob(trace, creds, "R2DQ", "localhost:10241", eci.toString());
assertEquals(eci.toString(), createdJob.getExternalCompactionId());
assertEquals(1, coordinator.getQueues().size());
assertEquals(1, coordinator.getIndex().size());
assertEquals(1, coordinator.getRunning().size());
Entry<ExternalCompactionId,RunningCompaction> entry =
coordinator.getRunning().entrySet().iterator().next();
assertEquals(eci.toString(), entry.getKey().toString());
assertEquals("localhost:10241", entry.getValue().getCompactorAddress());
assertEquals(eci.toString(), entry.getValue().getJob().getExternalCompactionId());
PowerMock.verifyAll();
coordinator.resetInternals();
coordinator.close();
}
@Test
public void testGetCompactionJobNoJobs() throws Exception {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
ServerContext context = PowerMock.createNiceMock(ServerContext.class);
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
TCredentials creds = PowerMock.createNiceMock(TCredentials.class);
CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class);
LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);
ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
expect(client.getAddress()).andReturn(address).anyTimes();
TabletClientService.Client tsc = PowerMock.createNiceMock(TabletClientService.Client.class);
AuditedSecurityOperation security = PowerMock.createNiceMock(AuditedSecurityOperation.class);
expect(security.canPerformSystemActions(creds)).andReturn(true);
PowerMock.replayAll();
var coordinator = new TestCoordinator(finalizer, tservers, client, tsc, context, security);
coordinator.resetInternals();
TExternalCompactionJob job = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, "R2DQ",
"localhost:10240", UUID.randomUUID().toString());
assertNull(job.getExternalCompactionId());
PowerMock.verifyAll();
coordinator.resetInternals();
coordinator.close();
}
@Test
public void testCleanUpRunning() throws Exception {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
ServerContext context = PowerMock.createNiceMock(ServerContext.class);
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
TCredentials creds = PowerMock.createNiceMock(TCredentials.class);
CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class);
LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);
ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
expect(client.getAddress()).andReturn(address).anyTimes();
TabletClientService.Client tsc = PowerMock.createNiceMock(TabletClientService.Client.class);
AuditedSecurityOperation security = PowerMock.createNiceMock(AuditedSecurityOperation.class);
expect(security.canPerformSystemActions(creds)).andReturn(true);
PowerMock.replayAll();
try (var coordinator =
new TestCoordinator(finalizer, tservers, client, tsc, context, security)) {
coordinator.resetInternals();
var ecid1 = ExternalCompactionId.generate(UUID.randomUUID());
var ecid2 = ExternalCompactionId.generate(UUID.randomUUID());
var ecid3 = ExternalCompactionId.generate(UUID.randomUUID());
coordinator.getRunning().put(ecid1, new RunningCompaction(new TExternalCompaction()));
coordinator.getRunning().put(ecid2, new RunningCompaction(new TExternalCompaction()));
coordinator.getRunning().put(ecid3, new RunningCompaction(new TExternalCompaction()));
coordinator.cleanUpRunning();
assertEquals(Set.of(ecid1, ecid2, ecid3), coordinator.getRunning().keySet());
coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2));
coordinator.cleanUpRunning();
assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet());
}
}
}