blob: f6f05d255e0920c006af942241adb6309d4aabfc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.client.dataset;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.apache.hyracks.api.channels.IInputChannel;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
import org.apache.hyracks.api.dataset.IDatasetInputChannelMonitor;
import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.client.net.ClientNetworkManager;
import org.apache.hyracks.comm.channels.DatasetNetworkInputChannel;
// TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client?
public class HyracksDatasetReader implements IHyracksDatasetReader {
private static final Logger LOGGER = Logger.getLogger(HyracksDatasetReader.class.getName());
private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
private final ClientNetworkManager netManager;
private final DatasetClientContext datasetClientCtx;
private JobId jobId;
private ResultSetId resultSetId;
private DatasetDirectoryRecord[] knownRecords;
private IDatasetInputChannelMonitor[] monitors;
private int lastReadPartition;
private IDatasetInputChannelMonitor lastMonitor;
private DatasetNetworkInputChannel resultChannel;
private static int NUM_READ_BUFFERS = 1;
public HyracksDatasetReader(IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection,
ClientNetworkManager netManager, DatasetClientContext datasetClientCtx, JobId jobId,
ResultSetId resultSetId)
throws Exception {
this.datasetDirectoryServiceConnection = datasetDirectoryServiceConnection;
this.netManager = netManager;
this.datasetClientCtx = datasetClientCtx;
this.jobId = jobId;
this.resultSetId = resultSetId;
knownRecords = null;
monitors = null;
lastReadPartition = -1;
lastMonitor = null;
resultChannel = null;
}
@Override
public Status getResultStatus() {
Status status = null;
try {
status = datasetDirectoryServiceConnection.getDatasetResultStatus(jobId, resultSetId);
} catch (Exception e) {
// TODO(madhusudancs): Decide what to do in case of error
}
return status;
}
private DatasetDirectoryRecord getRecord(int partition) throws Exception {
while (knownRecords == null || knownRecords[partition] == null) {
knownRecords = datasetDirectoryServiceConnection
.getDatasetResultLocations(jobId, resultSetId, knownRecords);
}
return knownRecords[partition];
}
private boolean nextPartition() throws HyracksDataException {
++lastReadPartition;
try {
DatasetDirectoryRecord record = getRecord(lastReadPartition);
while (record.getEmpty() && (++lastReadPartition) < knownRecords.length) {
record = getRecord(lastReadPartition);
}
if (lastReadPartition == knownRecords.length) {
return false;
}
resultChannel = new DatasetNetworkInputChannel(netManager, getSocketAddress(record), jobId, resultSetId,
lastReadPartition, NUM_READ_BUFFERS);
lastMonitor = getMonitor(lastReadPartition);
resultChannel.registerMonitor(lastMonitor);
resultChannel.open(datasetClientCtx);
return true;
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
@Override
public int read(IFrame frame) throws HyracksDataException {
frame.reset();
ByteBuffer readBuffer;
int readSize = 0;
if (lastReadPartition == -1) {
if (!nextPartition()) {
return readSize;
}
}
while (readSize < frame.getFrameSize()
&& !((lastReadPartition == knownRecords.length - 1) && isPartitionReadComplete(lastMonitor))) {
waitForNextFrame(lastMonitor);
if (isPartitionReadComplete(lastMonitor)) {
knownRecords[lastReadPartition].readEOS();
resultChannel.close();
if ((lastReadPartition == knownRecords.length - 1) || !nextPartition()) {
break;
}
} else {
readBuffer = resultChannel.getNextBuffer();
lastMonitor.notifyFrameRead();
if (readBuffer != null) {
if (readSize <=0) {
int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer);
frame.ensureFrameSize(frame.getMinSize() * nBlocks);
frame.getBuffer().clear();
frame.getBuffer().put(readBuffer);
resultChannel.recycleBuffer(readBuffer);
readSize = frame.getBuffer().position();
} else {
frame.getBuffer().put(readBuffer);
resultChannel.recycleBuffer(readBuffer);
readSize = frame.getBuffer().position();
}
}
}
}
frame.getBuffer().flip();
return readSize;
}
private static void waitForNextFrame(IDatasetInputChannelMonitor monitor) throws HyracksDataException {
synchronized (monitor) {
while (monitor.getNFramesAvailable() <= 0 && !monitor.eosReached() && !monitor.failed()) {
try {
monitor.wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
}
if (monitor.failed()) {
throw new HyracksDataException("Job Failed.");
}
}
private boolean isPartitionReadComplete(IDatasetInputChannelMonitor monitor) {
return (monitor.getNFramesAvailable() <= 0) && (monitor.eosReached());
}
private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
NetworkAddress netAddr = addr.getNetworkAddress();
return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), netAddr.getPort());
}
private IDatasetInputChannelMonitor getMonitor(int partition) throws HyracksException {
if (knownRecords == null || knownRecords[partition] == null) {
throw new HyracksException("Accessing monitors before the obtaining the corresponding addresses.");
}
if (monitors == null) {
monitors = new DatasetInputChannelMonitor[knownRecords.length];
}
if (monitors[partition] == null) {
monitors[partition] = new DatasetInputChannelMonitor();
}
return monitors[partition];
}
private class DatasetInputChannelMonitor implements IDatasetInputChannelMonitor {
private final AtomicInteger nAvailableFrames;
private final AtomicBoolean eos;
private final AtomicBoolean failed;
public DatasetInputChannelMonitor() {
nAvailableFrames = new AtomicInteger(0);
eos = new AtomicBoolean(false);
failed = new AtomicBoolean(false);
}
@Override
public synchronized void notifyFailure(IInputChannel channel) {
failed.set(true);
notifyAll();
}
@Override
public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
nAvailableFrames.addAndGet(nFrames);
notifyAll();
}
@Override
public synchronized void notifyEndOfStream(IInputChannel channel) {
eos.set(true);
notifyAll();
}
@Override
public synchronized boolean eosReached() {
return eos.get();
}
@Override
public synchronized boolean failed() {
return failed.get();
}
@Override
public synchronized int getNFramesAvailable() {
return nAvailableFrames.get();
}
@Override
public synchronized void notifyFrameRead() {
nAvailableFrames.decrementAndGet();
}
}
}