blob: 2eed3253fa5b8fd29037135dded332f7437e5278 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.orchestrator;
import java.util.ArrayList;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.config.CommonConfiguration;
import org.apache.uima.ducc.common.internationalization.Messages;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.event.cli.ReservationRequestProperties;
import org.apache.uima.ducc.transport.event.cli.ReservationSpecificationProperties;
import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
import org.apache.uima.ducc.transport.event.common.IDuccReservation;
import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState;
public class JobDriverHostManager {
private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(JobDriverHostManager.class.getName());
private static OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
private static Messages messages = orchestratorCommonArea.getSystemMessages();
private static CommonConfiguration commonConfiguration = orchestratorCommonArea.getCommonConfiguration();
private static JobDriverHostManager hostManager = new JobDriverHostManager();
public static JobDriverHostManager getInstance() {
return hostManager;
}
private ArrayList<String> keyList = new ArrayList<String>();
private TreeMap<String,NodeIdentity> nodeMap = new TreeMap<String,NodeIdentity>();
private AtomicBoolean assigned = new AtomicBoolean(false);
private DuccWorkReservation duccWorkReservation = null;
public JobDriverHostManager() {
}
private void updateAssigned() {
if(keyList.isEmpty()) {
assigned.set(false);
}
else {
assigned.set(true);
}
}
public void addNode(NodeIdentity node) {
synchronized(nodeMap) {
if(node != null) {
String key = node.getName();
nodeMap.put(key, node);
keyList.add(key);
}
updateAssigned();
}
}
public void delNode(NodeIdentity node) {
synchronized(nodeMap) {
if(node != null) {
String key = node.getName();
nodeMap.remove(key);
keyList.remove(key);
}
updateAssigned();
}
}
public int nodes() {
if(!assigned.get()) {
tryAssignment();
}
return nodeMap.size();
}
public NodeIdentity getNode() {
NodeIdentity retVal = null;
if(!assigned.get()) {
tryAssignment();
}
synchronized(nodeMap) {
if(!nodeMap.isEmpty()) {
String key = keyList.remove(0);
keyList.add(key);
retVal = nodeMap.get(key);
}
}
return retVal;
}
private void tryAssignment() {
String methodName = "tryAssignment";
synchronized(nodeMap) {
if(duccWorkReservation != null) {
if(duccWorkReservation.isDispatchable()) {
if(!duccWorkReservation.getReservationMap().isEmpty()) {
IDuccReservationMap map = duccWorkReservation.getReservationMap();
if(!map.isEmpty()) {
keyList = new ArrayList<String>();
nodeMap = new TreeMap<String,NodeIdentity>();
for (DuccId key : map.keySet()) {
IDuccReservation value = duccWorkReservation.getReservationMap().get(key);
NodeIdentity node = value.getNodeIdentity();
addNode(node);
logger.info(methodName, null, messages.fetchLabel("assigned")+node.getName()+" "+node.getIp());
}
}
}
}
}
}
return;
}
private void processJdHostClass() {
String methodName = "processJdHostClass";
logger.trace(methodName, null, messages.fetch("enter"));
DuccPropertiesResolver dpr = DuccPropertiesResolver.getInstance();
String jdHostClass = dpr.getCachedProperty(DuccPropertiesResolver.ducc_jd_host_class);
String jdHostDescription = dpr.getCachedProperty(DuccPropertiesResolver.ducc_jd_host_description);
String jdHostMemorySize = dpr.getCachedProperty(DuccPropertiesResolver.ducc_jd_host_memory_size);
String jdHostumberOfMachines = dpr.getCachedProperty(DuccPropertiesResolver.ducc_jd_host_number_of_machines);
String jdHostUser = dpr.getCachedProperty(DuccPropertiesResolver.ducc_jd_host_user);
ReservationRequestProperties reservationRequestProperties = new ReservationRequestProperties();
reservationRequestProperties.put(ReservationSpecificationProperties.key_scheduling_class, jdHostClass);
reservationRequestProperties.put(ReservationSpecificationProperties.key_description, jdHostDescription);
reservationRequestProperties.put(ReservationSpecificationProperties.key_instance_memory_size, jdHostMemorySize);
reservationRequestProperties.put(ReservationSpecificationProperties.key_number_of_instances, jdHostumberOfMachines);
reservationRequestProperties.put(ReservationSpecificationProperties.key_user, jdHostUser);
duccWorkReservation = ReservationFactory.getInstance().create(commonConfiguration, reservationRequestProperties);
DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
workMap.addDuccWork(duccWorkReservation);
// state: Received
duccWorkReservation.stateChange(ReservationState.Received);
OrchestratorCheckpoint.getInstance().saveState();
// state: WaitingForResources
duccWorkReservation.stateChange(ReservationState.WaitingForResources);
OrchestratorCheckpoint.getInstance().saveState();
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
public void init() {
String methodName = "init";
logger.trace(methodName, null, messages.fetch("enter"));
processJdHostClass();
logger.trace(methodName, null, messages.fetch("exit"));
return ;
}
public void conditional() {
String methodName = "conditional";
logger.trace(methodName, null, messages.fetch("enter"));
DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
if(workMap.size() == 0) {
logger.info(methodName, null, messages.fetch("make allocation for JD"));
init();
}
else {
logger.info(methodName, null, messages.fetch("bypass allocation for JD"));
}
logger.trace(methodName, null, messages.fetch("exit"));
return ;
}
}