blob: 8d6ab387e267068b9d5076b73c0dc131e7efa364 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.pregelix.dataflow.context;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
import edu.uci.ics.hyracks.storage.common.file.TransientFileMapManager;
import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceRepository;
import edu.uci.ics.pregelix.api.graph.Vertex;
public class RuntimeContext implements IWorkspaceFileFactory {
private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
private final IIndexLifecycleManager lcManager;
private final ILocalResourceRepository localResourceRepository;
private final ResourceIdFactory resourceIdFactory;
private final IBufferCache bufferCache;
private final IFileMapManager fileMapManager;
private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
private final Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
private final Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
private final IOManager ioManager;
private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
public RuntimeContext(INCApplicationContext appCtx) {
fileMapManager = new TransientFileMapManager();
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
int pageSize = 64 * 1024;
long memSize = Runtime.getRuntime().maxMemory();
long bufferSize = memSize / 4;
int numPages = (int) (bufferSize / pageSize);
/** let the buffer cache never flush dirty pages */
bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000);
ioManager = (IOManager) appCtx.getRootContext().getIOManager();
lcManager = new IndexLifecycleManager();
localResourceRepository = new TransientLocalResourceRepository();
resourceIdFactory = new ResourceIdFactory(0);
}
public void close() {
for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
for (FileReference fileRef : entry.getValue())
fileRef.delete();
iterationToFiles.clear();
bufferCache.close();
appStateMap.clear();
System.gc();
}
public ILocalResourceRepository getLocalResourceRepository() {
return localResourceRepository;
}
public ResourceIdFactory getResourceIdFactory() {
return resourceIdFactory;
}
public IIndexLifecycleManager getIndexLifecycleManager() {
return lcManager;
}
public IBufferCache getBufferCache() {
return bufferCache;
}
public IFileMapProvider getFileMapManager() {
return fileMapManager;
}
public Map<StateKey, IStateObject> getAppStateStore() {
return appStateMap;
}
public static RuntimeContext get(IHyracksTaskContext ctx) {
return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
}
public synchronized void setVertexProperties(String giraphJobId, long numVertices, long numEdges) {
Boolean toMove = giraphJobIdToMove.get(giraphJobId);
if (toMove == null || toMove == true) {
if (giraphJobIdToSuperStep.get(giraphJobId) == null) {
giraphJobIdToSuperStep.put(giraphJobId, 0L);
}
long superStep = giraphJobIdToSuperStep.get(giraphJobId);
List<FileReference> files = iterationToFiles.remove(superStep - 1);
if (files != null) {
for (FileReference fileRef : files)
fileRef.delete();
}
Vertex.setSuperstep(++superStep);
Vertex.setNumVertices(numVertices);
Vertex.setNumEdges(numEdges);
giraphJobIdToSuperStep.put(giraphJobId, superStep);
giraphJobIdToMove.put(giraphJobId, false);
LOGGER.info("start iteration " + Vertex.getSuperstep());
}
System.gc();
}
public synchronized void endSuperStep(String giraphJobId) {
giraphJobIdToMove.put(giraphJobId, true);
LOGGER.info("end iteration " + Vertex.getSuperstep());
}
@Override
public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
final FileReference fRef = ioManager.createWorkspaceFile(prefix);
List<FileReference> files = iterationToFiles.get(Vertex.getSuperstep());
if (files == null) {
files = new ArrayList<FileReference>();
iterationToFiles.put(Vertex.getSuperstep(), files);
}
files.add(fRef);
return fRef;
}
@Override
public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
return ioManager.createWorkspaceFile(prefix);
}
}