blob: db14991c799e2f9746771203596ab8440b483019 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.service.impl;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.tez.common.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.service.ContainerRunner;
import org.apache.tez.shufflehandler.ShuffleHandler;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
import org.slf4j.LoggerFactory;
public class TezTestService extends AbstractService implements ContainerRunner {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TezTestService.class);
private final Configuration shuffleHandlerConf;
private final int numExecutors;
private final TezTestServiceProtocolServerImpl server;
private final ContainerRunnerImpl containerRunner;
private final String[] localDirs;
private final AtomicInteger numSubmissions = new AtomicInteger(0);
private final AtomicReference<InetSocketAddress> address = new AtomicReference<InetSocketAddress>();
private final TezExecutors sharedExecutor;
public TezTestService(Configuration conf, int numExecutors, long memoryAvailable, String[] localDirs) {
super(TezTestService.class.getSimpleName());
this.numExecutors = numExecutors;
this.localDirs = localDirs;
long memoryAvailableBytes = memoryAvailable;
long jvmMax = Runtime.getRuntime().maxMemory();
LOG.info(TezTestService.class.getSimpleName() + " created with the following configuration: " +
"numExecutors=" + numExecutors +
", workDirs=" + Arrays.toString(localDirs) +
", memoryAvailable=" + memoryAvailable +
", jvmMaxMemory=" + jvmMax);
Preconditions.checkArgument(this.numExecutors > 0);
Preconditions.checkArgument(this.localDirs != null && this.localDirs.length > 0,
"Work dirs must be specified");
Preconditions.checkState(jvmMax >= memoryAvailableBytes,
"Invalid configuration. Xmx value too small. maxAvailable=" + jvmMax + ", configured=" +
memoryAvailableBytes);
this.shuffleHandlerConf = new Configuration(conf);
// Start Shuffle on a random port
this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS, StringUtils.arrayToString(localDirs));
this.server = new TezTestServiceProtocolServerImpl(this, address);
this.sharedExecutor = new TezSharedExecutor(conf);
this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, address,
memoryAvailableBytes, sharedExecutor);
}
@Override
public void serviceInit(Configuration conf) {
server.init(conf);
containerRunner.init(conf);
}
@Override
public void serviceStart() throws Exception {
ShuffleHandler.initializeAndStart(shuffleHandlerConf);
String auxiliaryService = getConfig().get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
containerRunner.setShufflePort(auxiliaryService, ShuffleHandler.get().getPort());
server.start();
containerRunner.start();
}
public void serviceStop() throws Exception {
containerRunner.stop();
server.stop();
ShuffleHandler.get().stop();
sharedExecutor.shutdownNow();
}
public InetSocketAddress getListenerAddress() {
return server.getBindAddress();
}
public int getShufflePort() {
return ShuffleHandler.get().getPort();
}
@Override
public void queueContainer(RunContainerRequestProto request) throws TezException {
numSubmissions.incrementAndGet();
containerRunner.queueContainer(request);
}
@Override
public void submitWork(TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws
TezException {
numSubmissions.incrementAndGet();
containerRunner.submitWork(request);
}
public int getNumSubmissions() {
return numSubmissions.get();
}
}