blob: 911f3725744fca9c46fa4f0900564fc8c0f01f90 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.nc.dataset;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IFileHandle;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
public class ResultState implements IStateObject {
private static final String FILE_PREFIX = "result_";
private final ResultSetPartitionId resultSetPartitionId;
private final int frameSize;
private final IIOManager ioManager;
private final IWorkspaceFileFactory fileFactory;
private final AtomicBoolean eos;
private final AtomicBoolean failed;
private final List<Page> localPageList;
private FileReference fileRef;
private IFileHandle writeFileHandle;
private IFileHandle readFileHandle;
private long size;
private long persistentSize;
ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, IWorkspaceFileFactory fileFactory,
int frameSize) {
this.resultSetPartitionId = resultSetPartitionId;
this.ioManager = ioManager;
this.fileFactory = fileFactory;
this.frameSize = frameSize;
eos = new AtomicBoolean(false);
failed = new AtomicBoolean(false);
localPageList = new ArrayList<Page>();
fileRef = null;
writeFileHandle = null;
}
public synchronized void open() {
size = 0;
persistentSize = 0;
}
public synchronized void close() {
eos.set(true);
notifyAll();
}
public synchronized void closeAndDelete() {
// Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs
// to be taken when there are more requests to these result states.
failed.set(true);
if (writeFileHandle != null) {
try {
ioManager.close(writeFileHandle);
} catch (IOException e) {
// Since file handle could not be closed, just ignore.
}
}
if (fileRef != null) {
fileRef.delete();
}
}
public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
if (fileRef == null) {
String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
}
size += ioManager.syncWrite(writeFileHandle, size, buffer);
notifyAll();
}
public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer)
throws HyracksDataException {
int srcOffset = 0;
Page destPage = null;
if (!localPageList.isEmpty()) {
destPage = localPageList.get(localPageList.size() - 1);
}
while (srcOffset < buffer.limit()) {
if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
localPageList.add(destPage);
}
int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
srcOffset += srcLength;
size += srcLength;
}
notifyAll();
}
public synchronized void readOpen() {
// It is a noOp for now, leaving here to keep the API stable for future usage.
}
public synchronized void readClose() throws HyracksDataException {
if (readFileHandle != null) {
ioManager.close(readFileHandle);
}
}
public synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
long readSize = 0;
while (offset >= size && !eos.get() && !failed.get()) {
try {
wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
if ((offset >= size && eos.get()) || failed.get()) {
return readSize;
}
if (readFileHandle == null) {
initReadFileHandle();
}
readSize = ioManager.syncRead(readFileHandle, offset, buffer);
return readSize;
}
public long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
throws HyracksDataException {
long readSize = 0;
synchronized (this) {
while (offset >= size && !eos.get() && !failed.get()) {
try {
wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
if ((offset >= size && eos.get()) || failed.get()) {
return readSize;
}
if (offset < persistentSize) {
if (readFileHandle == null) {
initReadFileHandle();
}
readSize = ioManager.syncRead(readFileHandle, offset, buffer);
}
if (readSize < buffer.capacity()) {
long localPageOffset = offset - persistentSize;
int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize());
int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize());
Page page = getPage(localPageIndex);
if (page == null) {
return readSize;
}
readSize += buffer.remaining();
buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
}
}
datasetMemoryManager.pageReferenced(resultSetPartitionId);
return readSize;
}
public synchronized void abort() {
failed.set(true);
notifyAll();
}
public synchronized Page returnPage() throws HyracksDataException {
Page page = removePage();
// If we do not have any pages to be given back close the write channel since we don't write any more, return null.
if (page == null) {
ioManager.close(writeFileHandle);
return null;
}
page.getBuffer().flip();
if (fileRef == null) {
String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
notifyAll();
}
long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer());
persistentSize += delta;
return page;
}
public synchronized void setEOS(boolean eos) {
this.eos.set(eos);
}
public ResultSetPartitionId getResultSetPartitionId() {
return resultSetPartitionId;
}
public int getFrameSize() {
return frameSize;
}
public IIOManager getIOManager() {
return ioManager;
}
@Override
public JobId getJobId() {
return resultSetPartitionId.getJobId();
}
@Override
public Object getId() {
return resultSetPartitionId;
}
@Override
public long getMemoryOccupancy() {
throw new UnsupportedOperationException();
}
@Override
public void toBytes(DataOutput out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void fromBytes(DataInput in) throws IOException {
throw new UnsupportedOperationException();
}
private Page getPage(int index) {
Page page = null;
if (!localPageList.isEmpty()) {
page = localPageList.get(index);
}
return page;
}
private Page removePage() {
Page page = null;
if (!localPageList.isEmpty()) {
page = localPageList.remove(localPageList.size() - 1);
}
return page;
}
private void initReadFileHandle() throws HyracksDataException {
while (fileRef == null && !failed.get()) {
try {
wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
if (failed.get()) {
return;
}
readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
}
}