/*
 * Copyright 2009-2013 by The Regents of the University of California
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * you may obtain a copy of the License from
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package edu.uci.ics.pregelix.dataflow.context;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
import edu.uci.ics.hyracks.storage.common.file.TransientFileMapManager;
import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceRepository;
import edu.uci.ics.pregelix.api.graph.VertexContext;

public class RuntimeContext implements IWorkspaceFileFactory {

    private final static int SHUTDOWN_GRACEFUL_PERIOD = 5000;
    private final IIndexLifecycleManager lcManager;
    private final ILocalResourceRepository localResourceRepository;
    private final ResourceIdFactory resourceIdFactory;
    private final IBufferCache bufferCache;
    private final List<IVirtualBufferCache> vbcs;
    private final IFileMapManager fileMapManager;
    private final IOManager ioManager;
    private final Map<String, PJobContext> activeJobs = new ConcurrentHashMap<String, PJobContext>();
    private final int vFrameSize;

    private final ThreadFactory threadFactory = new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r);
        }
    };

    public RuntimeContext(INCApplicationContext appCtx, int vFrameSize) {
        this.vFrameSize = vFrameSize;
        int pageSize = vFrameSize;
        long memSize = Runtime.getRuntime().maxMemory();
        long bufferSize = memSize / 4;
        int numPages = (int) (bufferSize / pageSize);

        fileMapManager = new TransientFileMapManager();
        ICacheMemoryAllocator allocator = new HeapBufferAllocator();
        IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, pageSize, numPages);
        /** let the buffer cache never flush dirty pages */
        bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), prs, new PreDelayPageCleanerPolicy(
                Long.MAX_VALUE), fileMapManager, 1000000, threadFactory);
        int numPagesInMemComponents = numPages / 8;
        vbcs = new ArrayList<IVirtualBufferCache>();
        IVirtualBufferCache vBufferCache = new MultitenantVirtualBufferCache(new VirtualBufferCache(
                new HeapBufferAllocator(), pageSize, numPagesInMemComponents));
        vbcs.add(vBufferCache);
        ioManager = (IOManager) appCtx.getRootContext().getIOManager();
        lcManager = new NoBudgetIndexLifecycleManager();
        localResourceRepository = new TransientLocalResourceRepository();
        resourceIdFactory = new ResourceIdFactory(0);
    }

    public synchronized void close() throws HyracksDataException {
        for (Entry<String, PJobContext> entry : activeJobs.entrySet()) {
            entry.getValue().close();
        }
        activeJobs.clear();
        // wait a graceful period until all active operators using tree cursors are dead
        try {
            wait(SHUTDOWN_GRACEFUL_PERIOD);
        } catch (InterruptedException e) {

        }
        bufferCache.close();
    }

    public ILocalResourceRepository getLocalResourceRepository() {
        return localResourceRepository;
    }

    public ResourceIdFactory getResourceIdFactory() {
        return resourceIdFactory;
    }

    public IIndexLifecycleManager getIndexLifecycleManager() {
        return lcManager;
    }

    public IBufferCache getBufferCache() {
        return bufferCache;
    }

    public List<IVirtualBufferCache> getVirtualBufferCaches() {
        return vbcs;
    }

    public IFileMapProvider getFileMapManager() {
        return fileMapManager;
    }

    public synchronized Map<TaskIterationID, IStateObject> getAppStateStore(String jobId) {
        PJobContext activeJob = getActiveJob(jobId);
        return activeJob.getAppStateStore();
    }

    public static RuntimeContext get(IHyracksTaskContext ctx) {
        return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
    }

    public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration,
            ClassLoader cl) {
        PJobContext activeJob = getOrCreateActiveJob(jobId);
        activeJob.setVertexProperties(numVertices, numEdges, currentIteration, cl);
    }

    public synchronized void recoverVertexProperties(String jobId, long numVertices, long numEdges,
            long currentIteration, ClassLoader cl) {
        PJobContext activeJob = getActiveJob(jobId);
        activeJob.recoverVertexProperties(numVertices, numEdges, currentIteration, cl);
    }

    public synchronized void endSuperStep(String jobId) {
        PJobContext activeJob = getActiveJob(jobId);
        activeJob.endSuperStep();
    }

    public synchronized void clearState(String jobId, boolean allStates) throws HyracksDataException {
        PJobContext activeJob = getActiveJob(jobId);
        if (activeJob != null) {
            activeJob.clearState();
            if (allStates) {
                activeJobs.remove(jobId);
            }
        }
    }

    public long getSuperstep(String jobId) {
        PJobContext activeJob = getActiveJob(jobId);
        return activeJob == null ? 0 : activeJob.getVertexContext().getSuperstep();
    }

    public int getVFrameSize() {
        return vFrameSize;
    }

    public void setJobContext(String jobId, TaskAttemptContext tCtx) {
        PJobContext activeJob = getOrCreateActiveJob(jobId);
        activeJob.getVertexContext().setContext(tCtx);
    }

    public VertexContext getVertexContext(String jobId) {
        PJobContext activeJob = getActiveJob(jobId);
        return activeJob.getVertexContext();
    }

    private PJobContext getActiveJob(String jobId) {
        PJobContext activeJob = activeJobs.get(jobId);
        return activeJob;
    }

    private PJobContext getOrCreateActiveJob(String jobId) {
        PJobContext activeJob = activeJobs.get(jobId);
        if (activeJob == null) {
            activeJob = new PJobContext();
            activeJobs.put(jobId, activeJob);
        }
        return activeJob;
    }

    @Override
    public FileReference createManagedWorkspaceFile(String jobId) throws HyracksDataException {
        final FileReference fRef = ioManager.createWorkspaceFile(jobId);
        PJobContext activeJob = getActiveJob(jobId);
        long superstep = activeJob.getVertexContext().getSuperstep();
        List<FileReference> files = activeJob.getIterationToFiles().get(superstep);
        if (files == null) {
            files = new ArrayList<FileReference>();
            activeJob.getIterationToFiles().put(superstep, files);
        }
        files.add(fRef);
        return fRef;
    }

    @Override
    public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
        return ioManager.createWorkspaceFile(prefix);
    }

}
