blob: 3a6935f52bbef71bab65bf0ccc7483d5a25c7d8f [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.service;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.tez.common.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.tez.service.impl.TezTestService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MiniTezTestServiceCluster extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(MiniTezTestServiceCluster.class);
private final File testWorkDir;
private final long availableMemory;
private final int numExecutorsPerService;
private final String[] localDirs;
private final Configuration clusterSpecificConfiguration = new Configuration(false);
private TezTestService tezTestService;
public static MiniTezTestServiceCluster create(String clusterName, int numExecutorsPerService, long availableMemory, int numLocalDirs) {
return new MiniTezTestServiceCluster(clusterName, numExecutorsPerService, availableMemory, numLocalDirs);
}
// TODO Add support for multiple instances
private MiniTezTestServiceCluster(String clusterName, int numExecutorsPerService, long availableMemory, int numLocalDirs) {
super(clusterName + "_TezTestServerCluster");
Preconditions.checkArgument(numExecutorsPerService > 0);
Preconditions.checkArgument(availableMemory > 0);
Preconditions.checkArgument(numLocalDirs > 0);
String clusterNameTrimmed = clusterName.replace("$", "") + "_TezTestServerCluster";
File targetWorkDir = new File("target", clusterNameTrimmed);
try {
FileContext.getLocalFSFileContext().delete(
new Path(targetWorkDir.getAbsolutePath()), true);
} catch (Exception e) {
LOG.warn("Could not cleanup test workDir: " + targetWorkDir, e);
throw new RuntimeException("Could not cleanup test workDir: " + targetWorkDir, e);
}
if (Shell.WINDOWS) {
// The test working directory can exceed the maximum path length supported
// by some Windows APIs and cmd.exe (260 characters). To work around this,
// create a symlink in temporary storage with a much shorter path,
// targeting the full path to the test working directory. Then, use the
// symlink as the test working directory.
String targetPath = targetWorkDir.getAbsolutePath();
File link = new File(System.getProperty("java.io.tmpdir"),
String.valueOf(System.currentTimeMillis()));
String linkPath = link.getAbsolutePath();
try {
FileContext.getLocalFSFileContext().delete(new Path(linkPath), true);
} catch (IOException e) {
throw new YarnRuntimeException("could not cleanup symlink: " + linkPath, e);
}
// Guarantee target exists before creating symlink.
targetWorkDir.mkdirs();
Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
Shell.getSymlinkCommand(targetPath, linkPath));
try {
shexec.execute();
} catch (IOException e) {
throw new YarnRuntimeException(String.format(
"failed to create symlink from %s to %s, shell output: %s", linkPath,
targetPath, shexec.getOutput()), e);
}
this.testWorkDir = link;
} else {
this.testWorkDir = targetWorkDir;
}
this.numExecutorsPerService = numExecutorsPerService;
this.availableMemory = availableMemory;
// Setup Local Dirs
localDirs = new String[numLocalDirs];
for (int i = 0 ; i < numLocalDirs ; i++) {
File f = new File(testWorkDir, "localDir");
f.mkdirs();
LOG.info("Created localDir: " + f.getAbsolutePath());
localDirs[i] = f.getAbsolutePath();
}
}
@Override
public void serviceInit(Configuration conf) {
tezTestService = new TezTestService(conf, numExecutorsPerService, availableMemory, localDirs);
tezTestService.init(conf);
}
@Override
public void serviceStart() {
tezTestService.start();
clusterSpecificConfiguration.set(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS,
getServiceAddress().getHostName());
clusterSpecificConfiguration.setInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT,
getServiceAddress().getPort());
clusterSpecificConfiguration.setInt(
TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE,
numExecutorsPerService);
clusterSpecificConfiguration.setLong(
TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, availableMemory);
}
@Override
public void serviceStop() {
if (tezTestService != null) {
tezTestService.stop();
tezTestService = null;
}
}
/**
* return the address at which the service is listening
* @return host:port
*/
public InetSocketAddress getServiceAddress() {
Preconditions.checkState(getServiceState() == STATE.STARTED);
return tezTestService.getListenerAddress();
}
public int getShufflePort() {
Preconditions.checkState(getServiceState() == STATE.STARTED);
return tezTestService.getShufflePort();
}
public Configuration getClusterSpecificConfiguration() {
Preconditions.checkState(getServiceState() == STATE.STARTED);
return clusterSpecificConfiguration;
}
// Mainly for verification
public int getNumSubmissions() {
return tezTestService.getNumSubmissions();
}
}