/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
*/
package org.apache.uima.ducc.ws.server;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.uima.ducc.common.CancelReasons.CancelReason;
import org.apache.uima.ducc.common.json.MonitorInfo;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
import org.apache.uima.ducc.transport.event.cli.JobRequestProperties;
import org.apache.uima.ducc.transport.event.cli.SpecificationProperties;
import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState;
import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.common.IDuccWorkMap;
import org.apache.uima.ducc.transport.event.common.IRationale;
import org.apache.uima.ducc.ws.DuccData;
import org.apache.uima.ducc.ws.authentication.DuccAsUser;

public class DuccWebMonitorReservation {
	
	private static DuccLogger duccLogger = DuccLoggerComponents.getWsLogger(DuccWebMonitorReservation.class.getName());
	private static DuccId jobid = null;
	
	private ConcurrentHashMap<DuccId,MonitorInfo> mMap = new ConcurrentHashMap<DuccId,MonitorInfo>();
	private ConcurrentHashMap<DuccId,TrackingInfo> tMap = new ConcurrentHashMap<DuccId,TrackingInfo>();
	private ConcurrentHashMap<DuccId,Long> cMap = new ConcurrentHashMap<DuccId,Long>();
	
	private long millisPerMinute = 60*1000;
	private long timeoutMillis;
	
	protected DuccWebMonitorReservation(long timeoutMillis) {
		this.timeoutMillis = timeoutMillis;
	}
	
	protected void monitor(OrchestratorStateDuccEvent duccEvent) {
		String location = "monitor";
		duccLogger.trace(location, jobid, "enter");
		
		IDuccWorkMap dwm = duccEvent.getWorkMap();
		int size = dwm.getReservationKeySet().size();
		duccLogger.debug(location, jobid, "reservations: "+size);
		
		Iterator<DuccId> iterator;
		ArrayList<DuccId> gone = new ArrayList<DuccId>();
		
		iterator = mMap.keySet().iterator();
		while( iterator.hasNext() ) {
			DuccId duccId = iterator.next();
			gone.add(duccId);
		}
		
		long expiryMillis = System.currentTimeMillis()+timeoutMillis+1;
		
		iterator = dwm.getReservationKeySet().iterator();
		while( iterator.hasNext() ) {
			DuccId duccId = iterator.next();
			IDuccWork dw = (IDuccWork)dwm.findDuccWork(duccId);
			gone.remove(duccId);
			if(!mMap.containsKey(duccId)) {
				MonitorInfo monitorInfo = new MonitorInfo();
				mMap.putIfAbsent(duccId, monitorInfo);
				duccLogger.info(location, duccId, "monitor start");
				if(!tMap.containsKey(duccId)) {
					if(dw.isCancelOnInterrupt()) {
						TrackingInfo ti = new TrackingInfo();
						ti.time = expiryMillis;
						ti.user = dw.getStandardInfo().getUser();
						tMap.putIfAbsent(duccId,ti);
						duccLogger.info(location, duccId, "auto-cancel on");
					}
					else {
						duccLogger.info(location, duccId, "auto-cancel off");
					}
				}
			}
			DuccWorkReservation dwr = (DuccWorkReservation) dw;
			MonitorInfo monitorInfo = mMap.get(duccId);
			
			ArrayList<String> stateSequence = monitorInfo.stateSequence;
			String state = dwr.getReservationState().toString();
			if(!stateSequence.contains(state)) {
				duccLogger.info(location, duccId, "state: "+state);
				stateSequence.add(state);
			}
			
			String text = null;
			
			String rmReason = dwr.getRmReason();
			if(rmReason != null) {
				 text = rmReason;
			}
			
            IRationale rationale = dwr.getCompletionRationale();
            if (rationale != null && rationale.isSpecified()) {
                text = rationale.getText();
            }
            
            if (text != null) {
                monitorInfo.rationale = text;
            }
		}
		
		iterator = gone.iterator();
		while( iterator.hasNext() ) {
			DuccId duccId = iterator.next();
			mMap.remove(duccId);
			tMap.remove(duccId);
			duccLogger.info(location, duccId, "monitor stop");
		}
		
		duccLogger.trace(location, jobid, "exit");
	}
	
	protected String getCode(Map<DuccId, IDuccProcess> map) {
		String code = "?";
		if(map != null) {
			Iterator<DuccId> iterator = map.keySet().iterator();
			while(iterator.hasNext()) {
				DuccId key = iterator.next();
				IDuccProcess process = map.get(key);
				code = ""+process.getProcessExitCode();
				break;
			}
		}
		return code;
	}
	
	protected DuccId getKey(String jobId) {
		DuccId retVal = null;
		Enumeration<DuccId> keys = mMap.keys();
		while(keys.hasMoreElements()) {
			DuccId duccId = keys.nextElement();
			String mapId = ""+duccId.getFriendly();
			if(mapId.equals(jobId)) {
				retVal = duccId;
				break;
			}
		}
		return retVal;
	}
	
	public MonitorInfo renew(String jobId, AtomicInteger updateCounter) {
		String location = "renew";
		duccLogger.trace(location, jobid, "enter");
		
		MonitorInfo monitorInfo = new MonitorInfo();
		
		int countAtArrival = updateCounter.get();
		int countAtPresent = countAtArrival;
		int sleepSecondsMax = 3*60;
		
		DuccId duccId = getKey(jobId);
		
		if(duccId == null) {
			int sleepSeconds = 0;
			duccLogger.info(location, duccId, "Waiting for update...");
			while(duccId == null) {
				try {
					duccLogger.debug(location, duccId, "Waiting continues...");
					Thread.sleep(1000);
					sleepSeconds += 1;
					if(sleepSeconds > sleepSecondsMax) {
						break;
					}
					countAtPresent = updateCounter.get();
					if((countAtPresent-countAtArrival) > 2) {
						break;
					}
					duccId = getKey(jobId);
				}
				catch(Exception e) {
				}
			}
			duccLogger.info(location, duccId, "Waiting complete.");
			duccId = getKey(jobId);
		}
		
		if(duccId != null) {
			monitorInfo = mMap.get(duccId);
			if(tMap.containsKey(duccId)) {
				long expiryMillis = System.currentTimeMillis()+timeoutMillis+1;
				TrackingInfo ti = tMap.get(duccId);
				ti.time = expiryMillis;
				duccLogger.info(location, duccId, "auto-cancel expiry extended");
			}
		}
		else {
			try {
				int iJobId = Integer.parseInt(jobId);
				duccId = new DuccId(iJobId);
				duccLogger.info(location, duccId, "not found");
			}
			catch(Exception e) {
				duccLogger.error(location, jobid, e);
			}
		}
		
		DuccData duccData = DuccData.getInstance();
		IDuccWork dw = duccData.getReservation(duccId);
		
		DuccWorkReservation dwr = (DuccWorkReservation) dw;
		
		ArrayList<String> stateSequence = monitorInfo.stateSequence;
		ReservationState reservationState = dwr.getReservationState();
		if(reservationState != null) {
			String state = reservationState.toString();
			if(!stateSequence.contains(state)) {
				duccLogger.info(location, duccId, "state: "+state);
				stateSequence.add(state);
			}
		}
		
		monitorInfo.nodes = dwr.getNodes();
		
		if(monitorInfo.nodes != null) {
			StringBuffer sb = new StringBuffer();
			for(String node : monitorInfo.nodes) {
				sb.append(node);
				sb.append(" ");
			}
			duccLogger.debug(location, duccId, "nodes: "+sb);
		}
		String text = null;
		
		String rmReason = dwr.getRmReason();
		if(rmReason != null) {
			 text = rmReason;
		}
		
        IRationale rationale = dwr.getCompletionRationale();
        if (rationale != null && rationale.isSpecified()) {
            text = rationale.getText();
        }
        
        if (text != null) {
            monitorInfo.rationale = text;
        }
		
		duccLogger.trace(location, jobid, "exit");
		
		return monitorInfo;
	}
	
	protected Long getExpiry(DuccId duccId) {
		String location = "getExpiry";
		duccLogger.trace(location, duccId, "enter");
		Long retVal = null;
		if(!isCanceled(duccId)) {
			if(isCancelable(duccId)) {
				ConcurrentHashMap<DuccId,Long> eMap = getExpiryMap();
				if(eMap.containsKey(duccId)) {
					retVal = eMap.get(duccId);
				}
			}
		}
		duccLogger.trace(location, duccId, "exit");
		return retVal;
	}
	
	public ConcurrentHashMap<DuccId,Long> getExpiryMap() {
		String location = "getExpiryMap";
		duccLogger.trace(location, jobid, "enter");
		
		ConcurrentHashMap<DuccId,Long> eMap = new ConcurrentHashMap<DuccId,Long>();
		
		long nowMillis = System.currentTimeMillis();
		
		Enumeration<DuccId> keys = tMap.keys();
		while(keys.hasMoreElements()) {
			long minutesLeft = 0;
			DuccId duccId = keys.nextElement();
			TrackingInfo ti = tMap.get(duccId);
			long expiryMillis = ti.time;
			if(nowMillis < expiryMillis) {
				minutesLeft = (expiryMillis - nowMillis) / millisPerMinute;
			}
			eMap.put(duccId, minutesLeft);
		}
		
		duccLogger.trace(location, jobid, "exit");
		
		return eMap;
	}

	protected boolean isCanceled(DuccId duccId) {
		return cMap.containsKey(duccId);
	}
	
	private boolean isCancelable(DuccId duccId) {
		String location = "isCancelable";
		duccLogger.trace(location, duccId, "enter");
		boolean retVal = false;
		if(!cMap.containsKey(duccId)) {
			MonitorInfo monitorInfo = mMap.get(duccId);
			if(monitorInfo != null) {
				ArrayList<String> stateSequence = monitorInfo.stateSequence;
				if(stateSequence != null) {
					if(stateSequence.contains(JobState.Completing.toString())) {
						duccLogger.debug(location, duccId, "state: <uncancelable> "+stateSequence);
					}
					else if(stateSequence.contains(JobState.Completed.toString())) {
						duccLogger.debug(location, duccId, "state: <uncancelable> "+stateSequence);
					}
					else {
						duccLogger.debug(location, duccId, "state: <cancelable> "+stateSequence);
						retVal = true;
					}
				}
				else {
					duccLogger.warn(location, duccId, "stateSequence: <null>");
				}
			}
			else {
				duccLogger.warn(location, duccId, "monitorInfo: <null>");
			}
		}
		else {
			duccLogger.debug(location, duccId, "already canceled");
		}
		duccLogger.trace(location, duccId, "exit");
		return retVal;
	}

	protected void cancel(DuccId duccId, String userId) {
		String location = "cancel";
		duccLogger.trace(location, jobid, "enter");
		
		duccLogger.info(location, duccId, userId);
		
		String java = "/bin/java";
		String jhome = System.getProperty("java.home");
		String cp = System.getProperty("java.class.path");
		String jclass = "org.apache.uima.ducc.cli.DuccReservationCancel";
		String arg1 = "--"+JobRequestProperties.key_id;
		String arg2 = ""+duccId;
		String arg3 = "--"+SpecificationProperties.key_reason;
		String reason = CancelReason.MonitorPingOverdue.getText();
   		String arg4 = "\""+reason+"\"";
		
		String[] arglistUser = { "-u", userId, "--", jhome+java, "-cp", cp, jclass, arg1, arg2, arg3, arg4 };
		String result = DuccAsUser.duckling(userId, arglistUser);
		duccLogger.warn(location, duccId, result);
		
		cMap.put(duccId, new Long(System.currentTimeMillis()));
		tMap.remove(duccId);

		duccLogger.trace(location, jobid, "exit");
	}
	
	protected void canceler(long nowMillis) {
		String location = "canceler";
		duccLogger.trace(location, jobid, "enter");

		Enumeration<DuccId> keys = tMap.keys();
		while(keys.hasMoreElements()) {
			DuccId duccId = keys.nextElement();
			TrackingInfo ti = tMap.get(duccId);
			long expiryMillis = ti.time;
			if(nowMillis > expiryMillis) {
				if(isCancelable(duccId)) {
					cancel(duccId, ti.user);
				}
				else {
					duccLogger.debug(location, duccId, "not cancelable");
				}
			}
		}
		
		duccLogger.trace(location, jobid, "exit");
	}
	
}
