blob: c95594815048d6180f1b34d186e7cf85dde9d403 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.recon.ConfigurationProvider;
import org.apache.hadoop.ozone.recon.ReconServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.commons.io.FileUtils;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY;
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR;
import org.hadoop.ozone.recon.codegen.ReconSqlDbConfig;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
/**
* MiniOzoneCluster creates a complete in-process Ozone cluster suitable for
* running tests. The cluster consists of a OzoneManager,
* StorageContainerManager and multiple DataNodes.
*/
@InterfaceAudience.Private
public class MiniOzoneClusterImpl implements MiniOzoneCluster {
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneClusterImpl.class);
private final OzoneConfiguration conf;
private StorageContainerManager scm;
private OzoneManager ozoneManager;
private final List<HddsDatanodeService> hddsDatanodes;
private ReconServer reconServer;
// Timeout for the cluster to be ready
private int waitForClusterToBeReadyTimeout = 120000; // 2 min
private CertificateClient caClient;
/**
* Creates a new MiniOzoneCluster.
*
* @throws IOException if there is an I/O error
*/
protected MiniOzoneClusterImpl(OzoneConfiguration conf,
OzoneManager ozoneManager,
StorageContainerManager scm,
List<HddsDatanodeService> hddsDatanodes) {
this.conf = conf;
this.ozoneManager = ozoneManager;
this.scm = scm;
this.hddsDatanodes = hddsDatanodes;
}
/**
* Creates a new MiniOzoneCluster with Recon.
*
* @throws IOException if there is an I/O error
*/
MiniOzoneClusterImpl(OzoneConfiguration conf,
OzoneManager ozoneManager,
StorageContainerManager scm,
List<HddsDatanodeService> hddsDatanodes,
ReconServer reconServer) {
this.conf = conf;
this.ozoneManager = ozoneManager;
this.scm = scm;
this.hddsDatanodes = hddsDatanodes;
this.reconServer = reconServer;
}
/**
* Creates a new MiniOzoneCluster without the OzoneManager. This is used by
* {@link MiniOzoneHAClusterImpl} for starting multiple OzoneManagers.
*
* @param conf
* @param scm
* @param hddsDatanodes
*/
MiniOzoneClusterImpl(OzoneConfiguration conf, StorageContainerManager scm,
List<HddsDatanodeService> hddsDatanodes, ReconServer reconServer) {
this.conf = conf;
this.scm = scm;
this.hddsDatanodes = hddsDatanodes;
this.reconServer = reconServer;
}
public OzoneConfiguration getConf() {
return conf;
}
public String getServiceId() {
// Non-HA cluster doesn't have OM Service Id.
return null;
}
/**
* Waits for the Ozone cluster to be ready for processing requests.
*/
@Override
public void waitForClusterToBeReady()
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
final int healthy = scm.getNodeCount(HEALTHY);
final boolean isNodeReady = healthy == hddsDatanodes.size();
final boolean exitSafeMode = !scm.isInSafeMode();
LOG.info("{}. Got {} of {} DN Heartbeats.",
isNodeReady ? "Nodes are ready" : "Waiting for nodes to be ready",
healthy, hddsDatanodes.size());
LOG.info(exitSafeMode ? "Cluster exits safe mode" :
"Waiting for cluster to exit safe mode",
healthy, hddsDatanodes.size());
return isNodeReady && exitSafeMode;
}, 1000, waitForClusterToBeReadyTimeout);
}
/**
* Waits for atleast one RATIS pipeline of given factor to be reported in open
* state.
*/
@Override
public void waitForPipelineTobeReady(HddsProtos.ReplicationFactor factor,
int timeoutInMs) throws
TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
int openPipelineCount = scm.getPipelineManager().
getPipelines(HddsProtos.ReplicationType.RATIS,
factor, Pipeline.PipelineState.OPEN).size();
return openPipelineCount >= 1;
}, 1000, timeoutInMs);
}
/**
* Sets the timeout value after which
* {@link MiniOzoneClusterImpl#waitForClusterToBeReady} times out.
*
* @param timeoutInMs timeout value in milliseconds
*/
@Override
public void setWaitForClusterToBeReadyTimeout(int timeoutInMs) {
waitForClusterToBeReadyTimeout = timeoutInMs;
}
/**
* Waits for SCM to be out of Safe Mode. Many tests can be run iff we are out
* of Safe mode.
*
* @throws TimeoutException
* @throws InterruptedException
*/
@Override
public void waitTobeOutOfSafeMode()
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
if (!scm.isInSafeMode()) {
return true;
}
LOG.info("Waiting for cluster to be ready. No datanodes found");
return false;
}, 100, 1000 * 45);
}
@Override
public StorageContainerManager getStorageContainerManager() {
return this.scm;
}
@Override
public OzoneManager getOzoneManager() {
return this.ozoneManager;
}
@Override
public List<HddsDatanodeService> getHddsDatanodes() {
return hddsDatanodes;
}
@Override
public ReconServer getReconServer() {
return this.reconServer;
}
@Override
public int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException {
for (HddsDatanodeService service : hddsDatanodes) {
if (service.getDatanodeDetails().equals(dn)) {
return hddsDatanodes.indexOf(service);
}
}
throw new IOException(
"Not able to find datanode with datanode Id " + dn.getUuid());
}
@Override
public OzoneClient getClient() throws IOException {
return OzoneClientFactory.getRpcClient(conf);
}
@Override
public OzoneClient getRpcClient() throws IOException {
return OzoneClientFactory.getRpcClient(conf);
}
/**
* Returns an RPC proxy connected to this cluster's StorageContainerManager
* for accessing container location information. Callers take ownership of
* the proxy and must close it when done.
*
* @return RPC proxy for accessing container location information
* @throws IOException if there is an I/O error
*/
@Override
public StorageContainerLocationProtocolClientSideTranslatorPB
getStorageContainerLocationClient() throws IOException {
long version = RPC.getProtocolVersion(
StorageContainerLocationProtocolPB.class);
InetSocketAddress address = scm.getClientRpcAddress();
LOG.info(
"Creating StorageContainerLocationProtocol RPC client with address {}",
address);
return new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
}
@Override
public void restartStorageContainerManager(boolean waitForDatanode)
throws TimeoutException, InterruptedException, IOException,
AuthenticationException {
scm.stop();
scm.join();
scm = StorageContainerManager.createSCM(conf);
scm.start();
if (waitForDatanode) {
waitForClusterToBeReady();
}
}
@Override
public void restartOzoneManager() throws IOException {
ozoneManager.stop();
ozoneManager.restart();
}
@Override
public void restartReconServer() {
stopRecon(reconServer);
startRecon();
}
private void waitForHddsDatanodesStop() throws TimeoutException,
InterruptedException {
GenericTestUtils.waitFor(() -> {
final int healthy = scm.getNodeCount(HEALTHY);
boolean isReady = healthy == hddsDatanodes.size();
if (!isReady) {
LOG.info("Waiting on {} datanodes out of {} to be marked unhealthy.",
healthy, hddsDatanodes.size());
}
return isReady;
}, 1000, waitForClusterToBeReadyTimeout);
}
@Override
public void restartHddsDatanode(int i, boolean waitForDatanode)
throws InterruptedException, TimeoutException {
HddsDatanodeService datanodeService = hddsDatanodes.get(i);
stopDatanode(datanodeService);
// ensure same ports are used across restarts.
OzoneConfiguration config = datanodeService.getConf();
int currentPort = datanodeService.getDatanodeDetails()
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
config.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
config.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
int ratisPort = datanodeService.getDatanodeDetails()
.getPort(DatanodeDetails.Port.Name.RATIS).getValue();
config.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
config.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
hddsDatanodes.remove(i);
if (waitForDatanode) {
// wait for node to be removed from SCM healthy node list.
waitForHddsDatanodesStop();
}
String[] args = new String[] {};
HddsDatanodeService service =
HddsDatanodeService.createHddsDatanodeService(args);
hddsDatanodes.add(i, service);
service.start(config);
if (waitForDatanode) {
// wait for the node to be identified as a healthy node again.
waitForClusterToBeReady();
}
}
@Override
public void restartHddsDatanode(DatanodeDetails dn, boolean waitForDatanode)
throws InterruptedException, TimeoutException, IOException {
restartHddsDatanode(getHddsDatanodeIndex(dn), waitForDatanode);
}
@Override
public void shutdownHddsDatanode(int i) {
stopDatanode(hddsDatanodes.get(i));
}
@Override
public void shutdownHddsDatanode(DatanodeDetails dn) throws IOException {
shutdownHddsDatanode(getHddsDatanodeIndex(dn));
}
@Override
public void shutdown() {
try {
LOG.info("Shutting down the Mini Ozone Cluster");
File baseDir = new File(GenericTestUtils.getTempPath(
MiniOzoneClusterImpl.class.getSimpleName() + "-" +
scm.getClientProtocolServer().getScmInfo().getClusterId()));
stop();
FileUtils.deleteDirectory(baseDir);
ContainerCache.getInstance(conf).shutdownCache();
DefaultMetricsSystem.shutdown();
} catch (IOException e) {
LOG.error("Exception while shutting down the cluster.", e);
}
}
@Override
public void stop() {
LOG.info("Stopping the Mini Ozone Cluster");
stopOM(ozoneManager);
stopDatanodes(hddsDatanodes);
stopSCM(scm);
stopRecon(reconServer);
}
/**
* Start Scm.
*/
@Override
public void startScm() throws IOException {
scm.start();
}
/**
* Start DataNodes.
*/
@Override
public void startHddsDatanodes() {
hddsDatanodes.forEach((datanode) -> {
datanode.setCertificateClient(getCAClient());
datanode.start();
});
}
@Override
public void shutdownHddsDatanodes() {
hddsDatanodes.forEach((datanode) -> {
try {
shutdownHddsDatanode(datanode.getDatanodeDetails());
} catch (IOException e) {
LOG.error("Exception while trying to shutdown datanodes:", e);
}
});
}
@Override
public void startRecon() {
reconServer = new ReconServer();
reconServer.execute(new String[]{});
}
@Override
public void stopRecon() {
stopRecon(reconServer);
}
private CertificateClient getCAClient() {
return this.caClient;
}
private void setCAClient(CertificateClient client) {
this.caClient = client;
}
private static void stopDatanodes(
Collection<HddsDatanodeService> hddsDatanodes) {
if (!hddsDatanodes.isEmpty()) {
LOG.info("Stopping the HddsDatanodes");
hddsDatanodes.parallelStream()
.forEach(MiniOzoneClusterImpl::stopDatanode);
}
}
private static void stopDatanode(HddsDatanodeService dn) {
if (dn != null) {
dn.stop();
dn.join();
}
}
private static void stopSCM(StorageContainerManager scm) {
if (scm != null) {
LOG.info("Stopping the StorageContainerManager");
scm.stop();
scm.join();
}
}
private static void stopOM(OzoneManager om) {
if (om != null) {
LOG.info("Stopping the OzoneManager");
om.stop();
om.join();
}
}
private static void stopRecon(ReconServer reconServer) {
try {
if (reconServer != null) {
LOG.info("Stopping Recon");
reconServer.stop();
reconServer.join();
}
} catch (Exception e) {
LOG.error("Exception while shutting down Recon.", e);
}
}
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
public static class Builder extends MiniOzoneCluster.Builder {
/**
* Creates a new Builder.
*
* @param conf configuration
*/
public Builder(OzoneConfiguration conf) {
super(conf);
}
@Override
public MiniOzoneCluster build() throws IOException {
DefaultMetricsSystem.setMiniClusterMode(true);
initializeConfiguration();
StorageContainerManager scm = null;
OzoneManager om = null;
ReconServer reconServer = null;
List<HddsDatanodeService> hddsDatanodes = Collections.emptyList();
try {
scm = createSCM();
scm.start();
om = createOM();
if (certClient != null) {
om.setCertClient(certClient);
}
om.start();
if (includeRecon) {
configureRecon();
reconServer = new ReconServer();
reconServer.execute(new String[] {});
}
hddsDatanodes = createHddsDatanodes(scm, reconServer);
MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm,
hddsDatanodes, reconServer);
cluster.setCAClient(certClient);
if (startDataNodes) {
cluster.startHddsDatanodes();
}
return cluster;
} catch (Exception ex) {
stopOM(om);
if (includeRecon) {
stopRecon(reconServer);
}
if (startDataNodes) {
stopDatanodes(hddsDatanodes);
}
stopSCM(scm);
removeConfiguration();
if (ex instanceof IOException) {
throw (IOException) ex;
}
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
throw new IOException("Unable to build MiniOzoneCluster. ", ex);
}
}
/**
* Initializes the configuration required for starting MiniOzoneCluster.
*
* @throws IOException
*/
protected void initializeConfiguration() throws IOException {
Path metaDir = Paths.get(path, "ozone-meta");
Files.createDirectories(metaDir);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDir.toString());
if (!chunkSize.isPresent()) {
//set it to 1MB by default in tests
chunkSize = Optional.of(1);
}
if (!streamBufferSize.isPresent()) {
streamBufferSize = OptionalInt.of(chunkSize.get());
}
if (!streamBufferFlushSize.isPresent()) {
streamBufferFlushSize = Optional.of((long) chunkSize.get());
}
if (!streamBufferMaxSize.isPresent()) {
streamBufferMaxSize = Optional.of(2 * streamBufferFlushSize.get());
}
if (!blockSize.isPresent()) {
blockSize = Optional.of(2 * streamBufferMaxSize.get());
}
if (!streamBufferSizeUnit.isPresent()) {
streamBufferSizeUnit = Optional.of(StorageUnit.MB);
}
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setStreamBufferSize(
(int) Math.round(
streamBufferSizeUnit.get().toBytes(streamBufferSize.getAsInt())));
clientConfig.setStreamBufferMaxSize(Math.round(
streamBufferSizeUnit.get().toBytes(streamBufferMaxSize.get())));
clientConfig.setStreamBufferFlushSize(Math.round(
streamBufferSizeUnit.get().toBytes(streamBufferFlushSize.get())));
conf.setFromObject(clientConfig);
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
chunkSize.get(), streamBufferSizeUnit.get());
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(),
streamBufferSizeUnit.get());
// MiniOzoneCluster should have global pipeline upper limit.
conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT,
pipelineNumLimit >= DEFAULT_PIPELIME_LIMIT ?
pipelineNumLimit : DEFAULT_PIPELIME_LIMIT);
configureTrace();
}
void removeConfiguration() {
FileUtils.deleteQuietly(new File(path));
}
/**
* Creates a new StorageContainerManager instance.
*
* @return {@link StorageContainerManager}
* @throws IOException
*/
protected StorageContainerManager createSCM()
throws IOException, AuthenticationException {
configureSCM();
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
initializeScmStorage(scmStore);
StorageContainerManager scm = StorageContainerManager.createSCM(conf);
HealthyPipelineSafeModeRule rule =
scm.getScmSafeModeManager().getHealthyPipelineSafeModeRule();
if (rule != null) {
// Set threshold to wait for safe mode exit - this is needed since a
// pipeline is marked open only after leader election.
rule.setHealthyPipelineThresholdCount(numOfDatanodes / 3);
}
return scm;
}
private void initializeScmStorage(SCMStorageConfig scmStore)
throws IOException {
if (scmStore.getState() == StorageState.INITIALIZED) {
return;
}
scmStore.setClusterId(clusterId);
if (!scmId.isPresent()) {
scmId = Optional.of(UUID.randomUUID().toString());
}
scmStore.setScmId(scmId.get());
scmStore.initialize();
}
void initializeOmStorage(OMStorage omStorage) throws IOException {
if (omStorage.getState() == StorageState.INITIALIZED) {
return;
}
omStorage.setClusterId(clusterId);
omStorage.setScmId(scmId.get());
omStorage.setOmId(omId.orElse(UUID.randomUUID().toString()));
// Initialize ozone certificate client if security is enabled.
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
OzoneManager.initializeSecurity(conf, omStorage);
}
omStorage.initialize();
}
/**
* Creates a new OzoneManager instance.
*
* @return {@link OzoneManager}
* @throws IOException
*/
protected OzoneManager createOM()
throws IOException, AuthenticationException {
configureOM();
OMStorage omStore = new OMStorage(conf);
initializeOmStorage(omStore);
return OzoneManager.createOm(conf);
}
/**
* Creates HddsDatanodeService(s) instance.
*
* @return List of HddsDatanodeService
* @throws IOException
*/
protected List<HddsDatanodeService> createHddsDatanodes(
StorageContainerManager scm, ReconServer reconServer)
throws IOException {
configureHddsDatanodes();
String scmAddress = scm.getDatanodeRpcAddress().getHostString() +
":" + scm.getDatanodeRpcAddress().getPort();
String[] args = new String[] {};
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, scmAddress);
List<HddsDatanodeService> hddsDatanodes = new ArrayList<>();
for (int i = 0; i < numOfDatanodes; i++) {
OzoneConfiguration dnConf = new OzoneConfiguration(conf);
String datanodeBaseDir = path + "/datanode-" + Integer.toString(i);
Path metaDir = Paths.get(datanodeBaseDir, "meta");
List<String> dataDirs = new ArrayList<>();
for (int j = 0; j < numDataVolumes; j++) {
Path dir = Paths.get(datanodeBaseDir, "data-" + j, "containers");
Files.createDirectories(dir);
dataDirs.add(dir.toString());
}
String listOfDirs = String.join(",", dataDirs);
Path ratisDir = Paths.get(datanodeBaseDir, "data", "ratis");
Path workDir = Paths.get(datanodeBaseDir, "data", "replication",
"work");
Files.createDirectories(metaDir);
Files.createDirectories(ratisDir);
Files.createDirectories(workDir);
dnConf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDir.toString());
dnConf.set(DFSConfigKeysLegacy.DFS_DATANODE_DATA_DIR_KEY, listOfDirs);
dnConf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
ratisDir.toString());
dnConf.set(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR,
workDir.toString());
if (reconServer != null) {
OzoneStorageContainerManager reconScm =
reconServer.getReconStorageContainerManager();
dnConf.set(OZONE_RECON_ADDRESS_KEY,
reconScm.getDatanodeRpcAddress().getHostString() + ":" +
reconScm.getDatanodeRpcAddress().getPort());
}
HddsDatanodeService datanode
= HddsDatanodeService.createHddsDatanodeService(args);
datanode.setConfiguration(dnConf);
hddsDatanodes.add(datanode);
}
return hddsDatanodes;
}
private void configureSCM() {
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
conf.setInt(ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY, numOfScmHandlers);
conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
"3s");
configureSCMheartbeat();
}
private void configureSCMheartbeat() {
if (hbInterval.isPresent()) {
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
hbInterval.get(), TimeUnit.MILLISECONDS);
} else {
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
DEFAULT_HB_INTERVAL_MS,
TimeUnit.MILLISECONDS);
}
if (hbProcessorInterval.isPresent()) {
conf.setTimeDuration(
ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
hbProcessorInterval.get(),
TimeUnit.MILLISECONDS);
} else {
conf.setTimeDuration(
ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
DEFAULT_HB_PROCESSOR_INTERVAL_MS,
TimeUnit.MILLISECONDS);
}
}
private void configureOM() {
conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY, "127.0.0.1:0");
conf.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
conf.setInt(OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY, numOfOmHandlers);
}
private void configureHddsDatanodes() {
conf.set(ScmConfigKeys.HDDS_REST_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(HddsConfigKeys.HDDS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
randomContainerPort);
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
randomContainerPort);
conf.setFromObject(new ReplicationConfig().setPort(0));
}
private void configureTrace() {
if (enableTrace.isPresent()) {
conf.setBoolean(OzoneConfigKeys.OZONE_TRACE_ENABLED_KEY,
enableTrace.get());
GenericTestUtils.setRootLogLevel(Level.TRACE);
}
GenericTestUtils.setRootLogLevel(Level.INFO);
}
protected void configureRecon() throws IOException {
ConfigurationProvider.resetConfiguration();
TemporaryFolder tempFolder = new TemporaryFolder();
tempFolder.create();
File tempNewFolder = tempFolder.newFolder();
conf.set(OZONE_RECON_DB_DIR,
tempNewFolder.getAbsolutePath());
conf.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR, tempNewFolder
.getAbsolutePath());
conf.set(OZONE_RECON_SCM_DB_DIR,
tempNewFolder.getAbsolutePath());
ReconSqlDbConfig dbConfig = conf.getObject(ReconSqlDbConfig.class);
dbConfig.setJdbcUrl("jdbc:derby:" + tempNewFolder.getAbsolutePath()
+ "/ozone_recon_derby.db");
conf.setFromObject(dbConfig);
conf.set(OZONE_RECON_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(OZONE_RECON_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
ConfigurationProvider.setConfiguration(conf);
}
}
}