/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
*/
package org.apache.uima.ducc.orchestrator.jd.scheduler;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.SizeBytes;
import org.apache.uima.ducc.common.config.CommonConfiguration;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.orchestrator.OrchestratorCheckpoint;
import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
import org.apache.uima.ducc.orchestrator.ReservationFactory;
import org.apache.uima.ducc.orchestrator.WorkMapHelper;
import org.apache.uima.ducc.transport.event.cli.ReservationRequestProperties;
import org.apache.uima.ducc.transport.event.cli.ReservationSpecificationProperties;
import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.ReservationCompletionType;
import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState;
import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.common.IDuccWorkMap;
import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
import org.apache.uima.ducc.transport.event.common.JdReservationBean;
import org.apache.uima.ducc.transport.event.common.Rationale;

public class JdScheduler {
	
	private static DuccLogger logger = new DuccLogger(JdScheduler.class);
	private static DuccId jobid = null;
	
	private static ReservationFactory reservationFactory = ReservationFactory.getInstance();
	
	private static JdScheduler instance = new JdScheduler();
	
	public static JdScheduler getInstance() {
		return instance;
	}
	
	private ConcurrentHashMap<DuccId,JdReservation> map = new ConcurrentHashMap<DuccId,JdReservation>();
	
	private AtomicBoolean autoManage = new AtomicBoolean(true);
	private AtomicBoolean requestPending = new AtomicBoolean(false);
	private AtomicBoolean changes = new AtomicBoolean(false);
	
	// Manage the allocation of JD slices, which comprises making reservations,
	// subdividing said reservations into slices, one per JD, and returning 
	// unused reservations.
	
	public JdScheduler() {	
	}
	
	// Auto-manage true is nominal.
	
	public void setAutomanage() {
		autoManage.set(true);
	}
	
	// Auto-manage false is for testing only.
	
	public void resetAutomanage() {
		autoManage.set(false);
	}
	
	// Save current slice allocations within each Reservation comprising
	// the Orchestrator's checkpoint map.
	
	public void ckpt() {
		String location = "ckpt";
		if(changes.get()) {
			changes.set(false);
			try {
				IDuccWorkMap dwm = OrchestratorCommonArea.getInstance().getWorkMap();
				for(Entry<DuccId, JdReservation> entry : map.entrySet()) {
					DuccId jdReservationDuccId = entry.getKey();
					DuccId duccId = (DuccId) jdReservationDuccId;
					IDuccWork dw = dwm.findDuccWork(duccId);
					if(dw instanceof IDuccWorkReservation) {
						IDuccWorkReservation dwr = (IDuccWorkReservation) dw;
						List<JdReservationBean> jdReservationBeanList = getJdReservationBeanList(jdReservationDuccId);
						dwr.setJdReservationBeanList(jdReservationBeanList);
						if(jdReservationBeanList != null) {
							logger.debug(location, duccId, "size: "+jdReservationBeanList.size());
						}
						else {
							logger.debug(location, duccId, "size: "+null);
						}
					}
				}
			}
			catch(Exception e) {
				logger.error(location, jobid, e);
			}
			OrchestratorCheckpoint.getInstance().saveState();
		}
	}
	
	// Restore current slice allocations within each Reservation comprising
	// the Orchestrator's checkpoint map.
	
	public void restore() {
		String location = "restore";
		try {
			IDuccWorkMap dwm = OrchestratorCommonArea.getInstance().getWorkMap();
			for(Entry<DuccId, IDuccWork> entry : dwm.getMap().entrySet()) {
				IDuccWork dw = entry.getValue();
				if(dw instanceof IDuccWorkReservation) {
					IDuccWorkReservation dwr = (IDuccWorkReservation) dw;
					List<JdReservationBean> jdReservationBeanList = dwr.getJdReservationBeanList();
					if(jdReservationBeanList != null) {
						setJdReservationBeanList(jdReservationBeanList);
					}
				}
			}
		}
		catch(Exception e) {
			logger.error(location, jobid, e);
		}
	}
	
	// Return the number of Reservations allocated for JDs.
	
	public int getReservationCount() {
		int count = 0;
		for(Entry<DuccId, JdReservation> entry : map.entrySet()) {
			JdReservation jdReservation = entry.getValue();
			switch(jdReservation.getReservationState()) {
			case Assigned:
				count += 1;
				break;
			default:
				break;
			}
		}
		return count;
	}
	
	// Process an OR publication.
	
	public void handle(IDuccWorkMap dwm) {
		String location = "handle";
		try {
			if(dwm != null) {
				logger.debug(location, jobid, "dwm size: "+dwm.size());
				JdHostProperties jdHostProperties = new JdHostProperties();
				resourceAccounting(dwm, jdHostProperties);
				resourceAdjustment(dwm, jdHostProperties);
				ckpt();
			}
			else {
				logger.debug(location, jobid, "dwm: null");
			}
		}
		catch(Exception e) {
			logger.error(location, jobid, e);
		}
	}
	
	// Account for state changes for JD Reservations.
	
	private void resourceAccounting(IDuccWorkMap dwm, JdHostProperties jdHostProperties) {
		String location = "resourceAccounting";
		String jdHostClass = jdHostProperties.getHostClass();
		boolean pendingFlag = false;
		if(jdHostClass != null) {
			Set<DuccId> known = new HashSet<DuccId>();
			known.addAll(map.keySet());
			for(Entry<DuccId, IDuccWork> entry : dwm.getMap().entrySet()) {
				IDuccWork dw = entry.getValue();
				if(dw instanceof IDuccWorkReservation) {
					IDuccWorkReservation dwr = (IDuccWorkReservation) dw;
					IDuccSchedulingInfo schedulingInfo = dwr.getSchedulingInfo();
					if(schedulingInfo != null) {
						String schedulingClass = schedulingInfo.getSchedulingClass();
						if(schedulingClass != null) {
							if(schedulingClass.equals(jdHostClass)) {
								ReservationState reservationState = dwr.getReservationState();
								switch(reservationState) {
								case Assigned:
									reservationUp(dwr, jdHostProperties);
									break;
								case Completed:
									reservationDown(dwr);
									break;
								case Received:	
								case Undefined:
									reservationOther(dwr, jdHostProperties);
									break;
								case WaitingForResources:
									pendingFlag = true;
									break;
								default:
									reservationOther(dwr, jdHostProperties);
									break;
								}
							}
						}
					}
					known.remove(entry.getKey());
				}
			}
			for(DuccId jdReservationDuccId : known) {
				reservationVanished(jdReservationDuccId);
			}
		}
		if(pendingFlag) {
			requestPending.set(true);
		}
		else {
			requestPending.set(false);
		}
		logger.trace(location, jobid, "total: "+countReservationsTotal()+" "+"up: "+countReservationsUp());
	}
	
	private long getSlicesReserveDesired(JdHostProperties jdHostProperties) {
		long slicesReserveDesired = JdHelper.getSlicesReserve(jdHostProperties);
		return slicesReserveDesired;
	}
	
	private long getSlicesReserveActual() {
		long slicesReserveActual = countSlicesAvailable();
		return slicesReserveActual;
	}
	
	// Determine if at least one JD Reservation can be unreserved.
	
	private boolean isReservationDivestable(IDuccWorkMap dwm, JdHostProperties jdHostProperties) {
		boolean retVal = false;
		long slicesReserveDesired = getSlicesReserveDesired(jdHostProperties);
		long slicesReserveActual = getSlicesReserveActual();
		if(map.size() > 1) {
			synchronized(this) {
				for(Entry<DuccId, JdReservation> entry : map.entrySet()) {
					JdReservation jdReservation = entry.getValue();
					if(jdReservation.isEmpty()) {
						long slicesToRelease = jdReservation.getSlicesTotal();
						long slicesReserveAfterRelease = slicesReserveActual - slicesToRelease;
						if(slicesReserveAfterRelease > slicesReserveDesired) {
							retVal = true;
							break;
						}
					}
				}
			}
		}
		return retVal;
	}
	
	// Acquire or divest JD Reservations based upon need.
	
	private void resourceAdjustment(IDuccWorkMap dwm, JdHostProperties jdHostProperties) {
		String location = "resourceAdjustment";
		if(autoManage.get()) {
			long slicesReserveDesired = getSlicesReserveDesired(jdHostProperties);
			long slicesReserveActual = getSlicesReserveActual();
			logger.debug(location, jobid, "actual: "+slicesReserveActual+" "+"desired: "+slicesReserveDesired);
			if(slicesReserveActual < slicesReserveDesired) {
				if(requestPending.get()) {
					reservationPending(dwm, jdHostProperties);
				}
				else {
					reservationAcquire(dwm, jdHostProperties);
				}
			}
			else if(isReservationDivestable(dwm, jdHostProperties)) {
				while(isReservationDivestable(dwm, jdHostProperties)) {
					reservationDivest(dwm, jdHostProperties);
				}
			}
			else {
				reservationNoChange(dwm, jdHostProperties);
			}
		}
		else {
			logger.debug(location, jobid, "automanage: "+autoManage.get());
		}
	}
	
	private void reservationPending(IDuccWorkMap dwm, JdHostProperties jdHostProperties) {
		String location = "reservationPending";
		long slicesReserveDesired = getSlicesReserveDesired(jdHostProperties);
		long slicesReserveActual = getSlicesReserveActual();
		logger.debug(location, jobid, "actual: "+slicesReserveActual+" "+"desired: "+slicesReserveDesired);
	}
	
	// Request a new JD Reservation.
	
	private void reservationAcquire(IDuccWorkMap dwm, JdHostProperties jdHostProperties) {
		String location = "reservationAcquire";
		CommonConfiguration common = null;
		ReservationRequestProperties reservationRequestProperties = new ReservationRequestProperties();
		//
		String key;
		String value;
		//
		key = ReservationSpecificationProperties.key_scheduling_class;
		value = jdHostProperties.getHostClass();
		reservationRequestProperties.setProperty(key, value);
		//
		key = ReservationSpecificationProperties.key_memory_size;
		value = jdHostProperties.getHostMemorySize();
		reservationRequestProperties.setProperty(key, value);
		//
		key = ReservationSpecificationProperties.key_user;
		value = jdHostProperties.getHostUser();
		reservationRequestProperties.setProperty(key, value);
		//
		key = ReservationSpecificationProperties.key_description;
		value = jdHostProperties.getHostDescription();
		reservationRequestProperties.setProperty(key, value);
		//
		DuccWorkReservation dwr = reservationFactory.create(common, reservationRequestProperties);
		dwr.setJdReservation();
		//
		DuccWorkMap workMap = (DuccWorkMap) dwm;
		WorkMapHelper.addDuccWork(workMap, dwr, this, location);
		// state: Received
		dwr.stateChange(ReservationState.Received);
		OrchestratorCheckpoint.getInstance().saveState();
		// state: WaitingForResources
		dwr.stateChange(ReservationState.WaitingForResources);
		OrchestratorCheckpoint.getInstance().saveState();
		//
		long slicesReserveDesired = getSlicesReserveDesired(jdHostProperties);
		long slicesReserveActual = getSlicesReserveActual();
		DuccId duccId = dwr.getDuccId();
		logger.debug(location, duccId, "actual: "+slicesReserveActual+" "+"desired: "+slicesReserveDesired);
	}
	
	// Return an unused JD Reservation.
	
	private void reservationDivest(IDuccWorkMap dwm, JdHostProperties jdHostProperties) {
		String location = "reservationDivest";
		DuccId jdReservationDuccId = null;
		synchronized(this) {
			for(Entry<DuccId, JdReservation> entry : map.entrySet()) {
				JdReservation jdReservation = entry.getValue();
				if(jdReservation.isEmpty()) {
					jdReservationDuccId = entry.getKey();
					map.remove(jdReservationDuccId);
					break;
				}
				jdReservation = null;
			}
		}
		if(jdReservationDuccId != null) {
			DuccId duccId = (DuccId) jdReservationDuccId;
			IDuccWork dw = dwm.findDuccWork(duccId);
			if(dw != null) {
				IDuccWorkReservation dwr = (IDuccWorkReservation) dw;
				// state: Completed
				dwr.stateChange(ReservationState.Completed);
				dwr.setCompletionType(ReservationCompletionType.CanceledBySystem);
				dwr.setCompletionRationale( new Rationale("excess capacity"));
				OrchestratorCheckpoint.getInstance().saveState();
			}
		}
		long slicesReserveDesired = getSlicesReserveDesired(jdHostProperties);
		long slicesReserveActual = getSlicesReserveActual();
		logger.debug(location, jobid, "actual: "+slicesReserveActual+" "+"desired: "+slicesReserveDesired);
	}
	
	// Nothing to do.
	
	private void reservationNoChange(IDuccWorkMap dwm, JdHostProperties jdHostProperties) {
		String location = "reservationNoChange";
		long slicesReserveDesired = getSlicesReserveDesired(jdHostProperties);
		long slicesReserveActual = getSlicesReserveActual();
		logger.trace(location, jobid, "actual: "+slicesReserveActual+" "+"desired: "+slicesReserveDesired);
	}
	
	// Update a list of JDs (DuccId's) allocated on a JD Reservation.
	
	private void setJdReservationBeanList(List<JdReservationBean> jdReservationBeanList) {
		if(jdReservationBeanList != null) {
			for(JdReservationBean entry : jdReservationBeanList) {
				JdReservation jdReservation = (JdReservation) entry;
				DuccId jdReservationDuccId = jdReservation.getDuccId();
				map.put(jdReservationDuccId, jdReservation);
			}
		}
	}
	
	// Return a list of JDs (DuccId's) allocated on a JD Reservation.
	
	public List<JdReservationBean> getJdReservationBeanList(DuccId jdReservationDuccId) {
		String location = "getJdReservationBeanList";
		List<JdReservationBean> jdReservationBeanList = new ArrayList<JdReservationBean>();
		for(Entry<DuccId, JdReservation> entry : map.entrySet()) {
			JdReservation jdReservation = entry.getValue();
			if(jdReservationDuccId.equals(jdReservation.getDuccId())) {
				jdReservationBeanList.add(jdReservation);
				DuccId duccId = (DuccId) jdReservationDuccId;
				logger.trace(location, duccId, jdReservationBeanList.size());
			}
		}
		return jdReservationBeanList;
	}
	
	// Return the number of JD Reservations.
	
	public int countReservationsTotal() {
		int count = map.size();
		return count;
	}
	
	// Return the number of JD Reservations that are "up" (e.g. in Assigned state).
	
	public int countReservationsUp() {
		int count = 0;
		for(Entry<DuccId, JdReservation> entry : map.entrySet()) {
			JdReservation jdReservation = entry.getValue();
			if(jdReservation.isUp()) {
				count +=1;
			}
		}
		return count;
	}
	
	// Handle a JD Reservation that has become available.
	
	private void reservationUp(IDuccWorkReservation dwr, JdHostProperties jdHostProperties) {
		String location = "reservationUp";
		DuccId duccId = dwr.getDuccId();
		DuccId jdReservationDuccId = (DuccId) duccId;
		JdReservation jdReservation = null;
		SizeBytes reservationSize = JdHelper.getReservationSize(dwr);
		SizeBytes sliceSize = JdHelper.getSliceSize(jdHostProperties);
		jdReservation = map.get(jdReservationDuccId);
		if(jdReservation == null) {
			jdReservation = new JdReservation(dwr, reservationSize, sliceSize);
			map.putIfAbsent(jdReservationDuccId, jdReservation);
		}
		else if(!jdReservation.isUp()) {
			jdReservation = new JdReservation(dwr, reservationSize, sliceSize);
			map.putIfAbsent(jdReservationDuccId, jdReservation);
		}
		jdReservation = map.get(jdReservationDuccId);
		if(jdReservation != null) {
			logger.debug(location, duccId, "host: "+jdReservation.getHost());
		}
	}
	
	// Handle a JD Reservation that has become unavailable.
	
	private void reservationDown(IDuccWorkReservation dwr) {
		String location = "reservationDown";
		DuccId duccId = dwr.getDuccId();
		DuccId jdReservationDuccId = (DuccId) duccId;
		JdReservation jdReservation = null;
		List<JdReservation> list = new ArrayList<JdReservation>();
		synchronized(this) {
			jdReservation = map.get(jdReservationDuccId);
			if(jdReservation != null) {
				map.remove(jdReservationDuccId);
				list.add(jdReservation);
			}
		}
		if(list.size() > 0) {
			logger.info(location, duccId, list.size());
			defunct(list);
		}
	}
	
	// Handle unexpected state for a JD Reservation.
	
	private void reservationOther(IDuccWorkReservation dwr, JdHostProperties jdHostProperties) {
		String location = " reservationOther";
		DuccId duccId = dwr.getDuccId();
		DuccId jdReservationDuccId = (DuccId) duccId;
		JdReservation jdReservation = null;
		SizeBytes reservationSize = JdHelper.getReservationSize(dwr);
		SizeBytes sliceSize = JdHelper.getSliceSize(jdHostProperties);
		jdReservation = map.get(jdReservationDuccId);
		if(jdReservation == null) {
			jdReservation = new JdReservation(dwr, reservationSize, sliceSize);
			map.putIfAbsent(jdReservationDuccId, jdReservation);
		}
		jdReservation = map.get(jdReservationDuccId);
		logger.trace(location, duccId, "total: "+countReservationsTotal()+" "+"up: "+countReservationsUp());
	}
	
	// Handle a JD Reservation that has disappeared for the Orchestrator publication.
	
	private void reservationVanished(DuccId jdReservationDuccId) {
		String location = "reservationVanished";
		List<JdReservation> list = new ArrayList<JdReservation>();
		synchronized(this) {
			JdReservation jdReservation = map.get(jdReservationDuccId);
			if(jdReservation != null) {
				jdReservation = map.remove(jdReservationDuccId);
				list.add(jdReservation);
			}
		}
		if(list.size() > 0) {
			DuccId duccId = (DuccId) jdReservationDuccId;
			logger.info(location, duccId, list.size());
			defunct(list);
		}
	}
	
	// Handle a list of JD Reservations that are no longer viable.
	
	private void defunct(List<JdReservation> list) {
		if(list != null) {
			if(!list.isEmpty()) {
				for(JdReservation jdReservation : list) {
					defunct(jdReservation);
				}
			}
		}
	}
	
	// Handle an individual JD Reservation that is no longer viable.
	
	private void defunct(JdReservation jdReservation) {
		String location = "defunct";
		//TODO phase I  = kill Job
		//TODO phase II = start new JD
		if(jdReservation != null) {
			DuccId duccId = (DuccId) jdReservation.getDuccId();
			logger.debug(location, duccId, "host: "+jdReservation.getHost());
		}
	}
	
	// Get a slice, if one is available.
	
	public NodeIdentity allocate(DuccId jdId, DuccId jobId) {
		String location = "allocate";
		NodeIdentity nodeIdentity = null;
		if(jdId != null) {
			String host = null;
			synchronized(this) {
				for(Entry<DuccId, JdReservation> entry : map.entrySet()) {
					JdReservation jdReservation = entry.getValue();
					nodeIdentity = jdReservation.allocate(jdId, jobId);
					if(nodeIdentity != null) {
						host = nodeIdentity.getName();
						changes.set(true);
						break;
					}
				}
			} 
			if(nodeIdentity != null) {
				logger.debug(location, jobId, "jdId:"+jdId+" "+"host: "+host);
			}
		}
		return nodeIdentity;
	}
	
	// Return a slice.
	
	public void deallocate(DuccId jdId, DuccId jobId) {
		String location = "deallocate";
		NodeIdentity nodeIdentity = null;
		if(jdId != null) {
			String host = null;
			logger.debug(location, jobId, "map size: "+map.size());
			synchronized(this) {
				for(Entry<DuccId, JdReservation> entry : map.entrySet()) {
					JdReservation jdReservation = entry.getValue();
					logger.debug(location, jobId, "get host: "+jdReservation.getHost());
					logger.debug(location, jobId, "jdId: "+jdId);
					nodeIdentity = jdReservation.deallocate(jdId, jobId);
					if(nodeIdentity != null) {
						host = nodeIdentity.getName();
						changes.set(true);
						break;
					}
				}
			}
			if(nodeIdentity != null) {
				logger.debug(location, jobId, "jdId:"+jdId+" "+"host: "+host);
			}
		}
	}
	
	// Return the number of slices total for the specified JD Reservation.
	
	public int countSlicesTotal(DuccId duccId) {
		String location = "countSlicesTotal";
		int count = 0;
		JdReservation jdReservation = map.get(duccId);
		if(jdReservation != null) {
			count += jdReservation.getSlicesTotal();
		}
		logger.trace(location, duccId, count);
		return count;
	}
	
	// Return the number of slices inuse for the specified JD Reservation.
	
	public int countSlicesInuse(DuccId duccId) {
		String location = "countSlicesInuse";
		int count = 0;
		JdReservation jdReservation = map.get(duccId);
		if(jdReservation != null) {
			count += jdReservation.getSlicesInuse();
		}
		logger.trace(location, duccId, count);
		return count;
	}
	
	// Return the number of slices available for the specified JD Reservation.
	
	public int countSlicesAvailable(DuccId duccId) {
		String location = "countSlicesAvailable";
		int count = 0;
		JdReservation jdReservation = map.get(duccId);
		if(jdReservation != null) {
			count += jdReservation.getSlicesAvailable();
		}
		logger.trace(location, duccId, count);
		return count;
	}
	
	// Return the number of slices total (for all JD Reservations).
	
	public int countSlicesTotal() {
		String location = "countSlicesTotal";
		int count = 0;
		for(Entry<DuccId, JdReservation> entry : map.entrySet()) {
			JdReservation jdReservation = entry.getValue();
			count += jdReservation.getSlicesTotal();
		}
		logger.trace(location, jobid, count);
		return count;
	}
	
	// Return the number of slices inuse (for all JD Reservations).
	
	public int countSlicesInuse() {
		String location = "countSlicesInuse";
		int count = 0;
		for(Entry<DuccId, JdReservation> entry : map.entrySet()) {
			JdReservation jdReservation = entry.getValue();
			count += jdReservation.getSlicesInuse();
		}
		logger.trace(location, jobid, count);
		return count;
	}
	
	// Return the number of slices available (for all JD Reservations).
	
	public int countSlicesAvailable() {
		String location = "countSlicesAvailable";
		int count = 0;
		for(Entry<DuccId, JdReservation> entry : map.entrySet()) {
			JdReservation jdReservation = entry.getValue();
			count += jdReservation.getSlicesAvailable();
		}
		logger.trace(location, jobid, count);
		return count;
	}

}
