blob: c3831223c799222d61800a5840ea319bdf717506 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.tests;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.JoinDataGen;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.service.MiniTezTestServiceCluster;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.test.MiniTezCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExternalTezServiceTestHelper {
private static final Logger LOG = LoggerFactory.getLogger(ExternalTezServiceTestHelper.class);
private volatile MiniTezCluster tezCluster;
private volatile MiniDFSCluster dfsCluster;
private volatile MiniTezTestServiceCluster tezTestServiceCluster;
private volatile Configuration clusterConf = new Configuration();
private volatile Configuration confForJobs;
private volatile FileSystem remoteFs;
private volatile TezClient sharedTezClient;
/**
* Current usage: Create. setupSharedTezClient - during setup (beforeClass). Invoke tearDownAll when done (afterClass)
* Alternately tearDown the sharedTezClient independently
*/
public ExternalTezServiceTestHelper(String testRootDir) throws
IOException {
try {
clusterConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testRootDir);
dfsCluster =
new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).format(true).racks(null).build();
remoteFs = dfsCluster.getFileSystem();
LOG.info("MiniDFSCluster started");
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
tezCluster = new MiniTezCluster(TestExternalTezServices.class.getName(), 1, 1, 1);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
tezCluster.init(conf);
tezCluster.start();
LOG.info("MiniTezCluster started");
clusterConf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
for (Map.Entry<String, String> entry : tezCluster.getConfig()) {
clusterConf.set(entry.getKey(), entry.getValue());
}
long jvmMax = Runtime.getRuntime().maxMemory();
tezTestServiceCluster = MiniTezTestServiceCluster
.create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)), 1);
tezTestServiceCluster.init(clusterConf);
tezTestServiceCluster.start();
LOG.info("MiniTezTestServer started");
confForJobs = new Configuration(clusterConf);
for (Map.Entry<String, String> entry : tezTestServiceCluster
.getClusterSpecificConfiguration()) {
confForJobs.set(entry.getKey(), entry.getValue());
}
Path stagingDirPath = new Path("/tmp/tez-staging-dir");
remoteFs.mkdirs(stagingDirPath);
// This is currently configured to push tasks into the Service, and then use the standard RPC
confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
}
public void setupSharedTezClient(ServicePluginsDescriptor servicePluginsDescriptor) throws
IOException, TezException, InterruptedException {
// Create a session to use for all tests.
TezConfiguration tezClientConf = new TezConfiguration(confForJobs);
sharedTezClient = TezClient
.newBuilder(TestExternalTezServices.class.getSimpleName() + "_session", tezClientConf)
.setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
sharedTezClient.start();
LOG.info("Shared TezSession started");
sharedTezClient.waitTillReady();
LOG.info("Shared TezSession ready for submission");
}
public void tearDownAll() throws IOException, TezException {
if (sharedTezClient != null) {
sharedTezClient.stop();
sharedTezClient = null;
}
if (tezTestServiceCluster != null) {
tezTestServiceCluster.stop();
tezTestServiceCluster = null;
}
if (tezCluster != null) {
tezCluster.stop();
tezCluster = null;
}
if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
}
}
public void shutdownSharedTezClient() throws IOException, TezException {
if (sharedTezClient != null) {
sharedTezClient.stop();
sharedTezClient = null;
}
}
public void setupHashJoinData(Path srcDataDir, Path dataPath1, Path dataPath2,
Path expectedResultPath, Path outputPath) throws
Exception {
remoteFs.mkdirs(srcDataDir);
TezConfiguration tezConf = new TezConfiguration(confForJobs);
// Generate join data - with 2 tasks.
JoinDataGen dataGen = new JoinDataGen();
String[] dataGenArgs = new String[]{
dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
expectedResultPath.toString(), "2"};
assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient));
// Run the actual join - with 2 reducers
HashJoinExample joinExample = new HashJoinExample();
String[] args = new String[]{
dataPath1.toString(), dataPath2.toString(), "2", outputPath.toString()};
assertEquals(0, joinExample.run(tezConf, args, sharedTezClient));
LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result");
}
public MiniTezCluster getTezCluster() {
return tezCluster;
}
public MiniDFSCluster getDfsCluster() {
return dfsCluster;
}
public MiniTezTestServiceCluster getTezTestServiceCluster() {
return tezTestServiceCluster;
}
public Configuration getClusterConf() {
return clusterConf;
}
public Configuration getConfForJobs() {
return confForJobs;
}
public FileSystem getRemoteFs() {
return remoteFs;
}
public TezClient getSharedTezClient() {
Objects.requireNonNull(sharedTezClient);
return sharedTezClient;
}
}