blob: ab2405ab8cd0b201459462eee45674093e877ad1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Class to create mini-clusters in the background. In general creating a mini
* Cluster takes 15 - 30 seconds and destroying one can take about 10 seconds.
*
* When there are tests that must create a new cluster per test, the time taken
* to create the clusters can greatly increase the test runtime. If the test
* logic runs for longer than the time to create the cluster, then it can make
* sense to create a new cluster in the background while the first test is
* running. Then when test 1 completes, we can destroy its cluster in the
* background and immediately give a new cluster to test 2.
*
* If the runtime of the test logic is very fast (only a second or two) after
* the cluster is started, this class may not help much, as there will be no
* time to create the new cluster in the background before the next test starts,
* however shutting down the cluster in the background while the new cluster is
* getting created will likely save about 10 seconds per test.
*
* To use this class, setup the Cluster Provider in a static method annotated
* with @BeforeClass, eg:
*
* @BeforeClass
* public static void init() {
* OzoneConfiguration conf = new OzoneConfiguration();
* final int interval = 100;
*
* conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
* interval, TimeUnit.MILLISECONDS);
* ...
*
* ReplicationManagerConfiguration replicationConf =
* conf.getObject(ReplicationManagerConfiguration.class);
* replicationConf.setInterval(Duration.ofSeconds(1));
* conf.setFromObject(replicationConf);
*
* MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
* .setNumDatanodes(numOfDatanodes);
*
* clusterProvider = new MiniOzoneClusterProvider(conf, builder, 5);
* }
*
* Ensure you shutdown the provider in a @AfterClass annotated method:
*
* @AfterClass
* public static void shutdown() throws InterruptedException {
* if (clusterProvider != null) {
* clusterProvider.shutdown();
* }
* }
*
* Then in the @Before method, or in the test itself, obtain a cluster:
*
* @Before
* public void setUp() throws Exception {
* cluster = clusterProvider.provide();
* }
*
* @After
* public void tearDown() throws InterruptedException, IOException {
* if (cluster != null) {
* clusterProvider.destroy(cluster);
* }
* }
*
* This only works if the same config / builder object can be passed to each
* cluster in the test suite.
*
* Also note that the expected number of clusters must be passed to the
* provider. As things stand, the provider always starts creating a new cluster
* each time a cluster is requested, so it always has one in reserve. This will
* result in an extra cluster getting created which is not needed. To avoid
* this, pass the number of expected clusters into the provider. This will
* avoid the extra cluster creation, but if you request more clusters than the
* limit an error will be thrown.
*
*/
public class MiniOzoneClusterProvider {
private static final Logger LOG
= LoggerFactory.getLogger(MiniOzoneClusterProvider.class);
private static final int PRE_CREATE_LIMIT = 1;
private static final int EXPIRED_LIMIT = 4;
private volatile boolean shutdown = false;
private final int clusterLimit;
private int consumedClusterCount = 0;
private final OzoneConfiguration conf;
private final MiniOzoneCluster.Builder builder;
private final Thread createThread;
private final Thread reapThread;
private final Set<MiniOzoneCluster> createdClusters = new HashSet<>();
private final BlockingQueue<MiniOzoneCluster> clusters
= new ArrayBlockingQueue<>(PRE_CREATE_LIMIT);
private final BlockingQueue<MiniOzoneCluster> expiredClusters
= new ArrayBlockingQueue<>(EXPIRED_LIMIT);
/**
*
* @param conf The configuration to use when creating the cluster
* @param builder A builder object with all cluster options set
* @param clusterLimit The total number of clusters this provider should
* create. If another is requested after this limit has
* been reached, an exception will be thrown.
*/
public MiniOzoneClusterProvider(OzoneConfiguration conf,
MiniOzoneCluster.Builder builder, int clusterLimit) {
this.conf = conf;
this.builder = builder;
this.clusterLimit = clusterLimit;
createThread = createClusters();
reapThread = reapClusters();
}
public synchronized MiniOzoneCluster provide()
throws InterruptedException, IOException {
ensureNotShutdown();
if (consumedClusterCount >= clusterLimit) {
throw new IOException("The cluster limit of " + clusterLimit + " has "
+ "been reached for this provider. Please increase the value set "
+ "in the constructor");
}
MiniOzoneCluster cluster = clusters.poll(100, SECONDS);
if (cluster == null) {
throw new IOException("Failed to obtain available cluster in time");
}
createdClusters.add(cluster);
consumedClusterCount++;
return cluster;
}
public synchronized void destroy(MiniOzoneCluster c)
throws InterruptedException, IOException {
ensureNotShutdown();
createdClusters.remove(c);
expiredClusters.put(c);
}
public synchronized void shutdown() throws InterruptedException {
createThread.interrupt();
createThread.join();
destroyRemainingClusters();
shutdown = true;
reapThread.join();
}
private void ensureNotShutdown() throws IOException {
if (shutdown) {
throw new IOException("The mini-cluster provider is shutdown");
}
}
private Thread reapClusters() {
Thread t = new Thread(() -> {
while (!shutdown || !expiredClusters.isEmpty()) {
try {
// Why not just call take and wait forever until interrupt is
// thrown? Inside MiniCluster.shutdown, there are places where it
// waits on things to happen, and the interrupt can interrupt those
// and prevent the shutdown from happening correctly. Therefore it is
// safer to just poll and avoid interrupting this thread.
MiniOzoneCluster c = expiredClusters.poll(100,
TimeUnit.MILLISECONDS);
if (c != null) {
c.shutdown();
}
} catch (Exception e) {
LOG.error("Unexpected exception received", e);
}
}
});
t.setName("Mini-Cluster-Provider-Reap");
t.start();
return t;
}
private Thread createClusters() {
Thread t = new Thread(() -> {
int createdCount = 0;
while (!Thread.interrupted() && createdCount < clusterLimit) {
MiniOzoneCluster cluster = null;
try {
builder.setClusterId(UUID.randomUUID().toString());
OzoneConfiguration newConf = new OzoneConfiguration(conf);
List<Integer> portList = getFreePortList(4);
newConf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
"127.0.0.1:" + portList.get(0));
newConf.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY,
"127.0.0.1:" + portList.get(1));
newConf.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY,
"127.0.0.1:" + portList.get(2));
newConf.setInt(OMConfigKeys.OZONE_OM_RATIS_PORT_KEY,
portList.get(3));
builder.setConf(newConf);
cluster = builder.build();
cluster.waitForClusterToBeReady();
createdCount++;
clusters.put(cluster);
} catch (InterruptedException e) {
if (cluster != null) {
cluster.shutdown();
}
break;
} catch (IOException | TimeoutException e) {
throw new RuntimeException("Unable to build cluster", e);
}
}
});
t.setName("Mini-Cluster-Provider-Create");
t.start();
return t;
}
private void destroyRemainingClusters() {
while (!clusters.isEmpty()) {
try {
MiniOzoneCluster cluster = clusters.poll();
if (cluster != null) {
destroy(cluster);
}
} catch (InterruptedException | IOException e) {
LOG.error("Caught exception when destroying clusters", e);
// Do nothing as we want to ensure all clusters try to shutdown.
}
}
// If there are any clusters remaining in createdClusters, then destroy
// them too. This could be due to a test failing to return the cluster
// for some reason
MiniOzoneCluster[] remaining
= createdClusters.toArray(new MiniOzoneCluster[0]);
for (MiniOzoneCluster c : remaining) {
try {
destroy(c);
} catch (InterruptedException | IOException e) {
LOG.error("Caught exception when destroying remaining clusters", e);
}
}
createdClusters.clear();
}
private List<Integer> getFreePortList(int size) {
return org.apache.ratis.util.NetUtils.createLocalServerAddress(size)
.stream()
.map(inetSocketAddress -> inetSocketAddress.getPort())
.collect(Collectors.toList());
}
}