blob: d7d0f01126114cfeff89d6bc89d5957e8aa5b354 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.rm;
import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tez.common.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.service.TezTestServiceConfConstants;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TezTestServiceTaskSchedulerService extends TaskScheduler {
private static final Logger
LOG = LoggerFactory.getLogger(TezTestServiceTaskSchedulerService.class);
private final List<String> serviceHosts;
private final ContainerFactory containerFactory;
private final Random random = new Random();
// Currently all services must be running on the same port.
private final int containerPort;
private final ConcurrentMap<Object, ContainerId> runningTasks =
new ConcurrentHashMap<Object, ContainerId>();
// AppIdIdentifier to avoid conflicts with other containers in the system.
// Per instance
private final int memoryPerInstance;
private final int coresPerInstance;
private final int executorsPerInstance;
// Per Executor Thread
private final Resource resourcePerContainer;
// Not registering with the RM. Assuming the main TezScheduler will always run (except local mode),
// and take care of YARN registration.
public TezTestServiceTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
// Accepting configuration here to allow setting up fields as final
super(taskSchedulerContext);
this.serviceHosts = new LinkedList<String>();
this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
taskSchedulerContext.getCustomClusterIdentifier());
Configuration conf = null;
try {
conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
} catch (IOException e) {
throw new TezUncheckedException(e);
}
this.memoryPerInstance = conf
.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1);
Preconditions.checkArgument(memoryPerInstance > 0,
TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB +
" must be configured");
this.executorsPerInstance = conf.getInt(
TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE,
-1);
Preconditions.checkArgument(executorsPerInstance > 0,
TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE +
" must be configured");
this.coresPerInstance = conf
.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_VCPUS_PER_INSTANCE,
executorsPerInstance);
this.containerPort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
Preconditions.checkArgument(executorsPerInstance > 0,
TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be configured");
int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance);
int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance);
this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer);
String[] hosts = conf.getTrimmedStrings(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS);
if (hosts == null || hosts.length == 0) {
hosts = new String[]{"localhost"};
}
for (String host : hosts) {
serviceHosts.add(host);
}
LOG.info("Running with configuration: " +
"memoryPerInstance=" + memoryPerInstance +
", vcoresPerInstance=" + coresPerInstance +
", executorsPerInstance=" + executorsPerInstance +
", resourcePerContainerInferred=" + resourcePerContainer +
", hosts=" + serviceHosts.toString());
}
@Override
public Resource getAvailableResources() {
// TODO This needs information about all running executors, and the amount of memory etc available across the cluster.
return Resource
.newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance),
serviceHosts.size() * coresPerInstance);
}
@Override
public int getClusterNodeCount() {
return serviceHosts.size();
}
@Override
public void dagComplete() {
}
@Override
public Resource getTotalResources() {
return Resource
.newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance),
serviceHosts.size() * coresPerInstance);
}
@Override
public void blacklistNode(NodeId nodeId) {
LOG.info("BlacklistNode not supported");
}
@Override
public void unblacklistNode(NodeId nodeId) {
LOG.info("unBlacklistNode not supported");
}
@Override
public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
Priority priority, Object containerSignature, Object clientCookie) {
String host = selectHost(hosts);
Container container =
containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
runningTasks.put(task, container.getId());
getContext().taskAllocated(task, clientCookie, container);
}
@Override
public void allocateTask(Object task, Resource capability, ContainerId containerId,
Priority priority, Object containerSignature, Object clientCookie) {
String host = selectHost(null);
Container container =
containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
runningTasks.put(task, container.getId());
getContext().taskAllocated(task, clientCookie, container);
}
@Override
public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, String diagnostics) {
ContainerId containerId = runningTasks.remove(task);
if (containerId == null) {
LOG.error("Could not determine ContainerId for task: " + task +
" . Could have hit a race condition. Ignoring." +
" The query may hang since this \"unknown\" container is now taking up a slot permanently");
return false;
}
getContext().containerBeingReleased(containerId);
return true;
}
@Override
public Object deallocateContainer(ContainerId containerId) {
LOG.info("Ignoring deallocateContainer for containerId: " + containerId);
return null;
}
@Override
public void setShouldUnregister() {
}
@Override
public boolean hasUnregistered() {
// Nothing to do. No registration involved.
return true;
}
private String selectHost(String[] requestedHosts) {
String host;
if (requestedHosts != null && requestedHosts.length > 0) {
Arrays.sort(requestedHosts);
host = requestedHosts[0];
LOG.info("Selected host: " + host + " from requested hosts: " + Arrays.toString(requestedHosts));
} else {
host = serviceHosts.get(random.nextInt(serviceHosts.size()));
LOG.info("Selected random host: " + host + " since the request contained no host information");
}
return host;
}
static class ContainerFactory {
AtomicInteger nextId;
final ApplicationAttemptId customAppAttemptId;
public ContainerFactory(ApplicationAttemptId appAttemptId, long appIdLong) {
this.nextId = new AtomicInteger(1);
ApplicationId appId = ApplicationId
.newInstance(appIdLong, appAttemptId.getApplicationId().getId());
this.customAppAttemptId = ApplicationAttemptId
.newInstance(appId, appAttemptId.getAttemptId());
}
@SuppressWarnings("deprecation")
public Container createContainer(Resource capability, Priority priority, String hostname, int port) {
ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement());
NodeId nodeId = NodeId.newInstance(hostname, port);
String nodeHttpAddress = "hostname:0";
Container container = Container.newInstance(containerId,
nodeId,
nodeHttpAddress,
capability,
priority,
null);
return container;
}
}
}