| /* |
| * Copyright 2009-2010 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package edu.uci.ics.hyracks.control.cc.dataset; |
| |
| import java.util.Arrays; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| |
| import edu.uci.ics.hyracks.api.comm.NetworkAddress; |
| import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord; |
| import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord; |
| import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status; |
| import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService; |
| import edu.uci.ics.hyracks.api.dataset.ResultSetId; |
| import edu.uci.ics.hyracks.api.dataset.ResultSetMetaData; |
| import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; |
| import edu.uci.ics.hyracks.api.exceptions.HyracksException; |
| import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory; |
| import edu.uci.ics.hyracks.api.job.JobId; |
| |
| /** |
| * TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs |
| * location information is never evicted from the memory and the memory usage grows as the number of jobs in the system |
| * grows. What we should possibly do is, add an API call for the client to say that it received everything it has to for |
| * the job (after it receives all the results) completely. Then we can just get rid of the location information for that |
| * job. |
| */ |
| public class DatasetDirectoryService implements IDatasetDirectoryService { |
| private final Map<JobId, DatasetJobRecord> jobResultLocations; |
| |
| public DatasetDirectoryService(final int jobHistorySize) { |
| jobResultLocations = new LinkedHashMap<JobId, DatasetJobRecord>() { |
| private static final long serialVersionUID = 1L; |
| |
| protected boolean removeEldestEntry(Map.Entry<JobId, DatasetJobRecord> eldest) { |
| return size() > jobHistorySize; |
| } |
| }; |
| } |
| |
| @Override |
| public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException { |
| DatasetJobRecord djr = jobResultLocations.get(jobId); |
| if (djr == null) { |
| djr = new DatasetJobRecord(); |
| jobResultLocations.put(jobId, djr); |
| } |
| } |
| |
| @Override |
| public void notifyJobStart(JobId jobId) throws HyracksException { |
| // Auto-generated method stub |
| } |
| |
| @Override |
| public void notifyJobFinish(JobId jobId) throws HyracksException { |
| // Auto-generated method stub |
| } |
| |
| @Override |
| public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, |
| int partition, int nPartitions, NetworkAddress networkAddress) { |
| DatasetJobRecord djr = jobResultLocations.get(jobId); |
| |
| ResultSetMetaData resultSetMetaData = djr.get(rsId); |
| if (resultSetMetaData == null) { |
| resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]); |
| djr.put(rsId, resultSetMetaData); |
| } |
| |
| DatasetDirectoryRecord[] records = resultSetMetaData.getRecords(); |
| if (records[partition] == null) { |
| records[partition] = new DatasetDirectoryRecord(); |
| } |
| records[partition].setNetworkAddress(networkAddress); |
| records[partition].start(); |
| notifyAll(); |
| } |
| |
| @Override |
| public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) { |
| int successCount = 0; |
| |
| DatasetJobRecord djr = jobResultLocations.get(jobId); |
| ResultSetMetaData resultSetMetaData = djr.get(rsId); |
| DatasetDirectoryRecord[] records = resultSetMetaData.getRecords(); |
| records[partition].writeEOS(); |
| |
| for (DatasetDirectoryRecord record : records) { |
| if (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS) { |
| successCount++; |
| } |
| } |
| if (successCount == records.length) { |
| djr.success(); |
| } |
| notifyAll(); |
| } |
| |
| @Override |
| public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) { |
| DatasetJobRecord djr = jobResultLocations.get(jobId); |
| djr.fail(); |
| notifyAll(); |
| } |
| |
| @Override |
| public synchronized void reportJobFailure(JobId jobId) { |
| DatasetJobRecord djr = jobResultLocations.get(jobId); |
| djr.fail(); |
| notifyAll(); |
| } |
| |
| @Override |
| public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException { |
| DatasetJobRecord djr; |
| while ((djr = jobResultLocations.get(jobId)) == null) { |
| try { |
| wait(); |
| } catch (InterruptedException e) { |
| throw new HyracksDataException(e); |
| } |
| } |
| |
| return djr.getStatus(); |
| } |
| |
| @Override |
| public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId, |
| DatasetDirectoryRecord[] knownRecords) throws HyracksDataException { |
| DatasetDirectoryRecord[] newRecords; |
| while ((newRecords = updatedRecords(jobId, rsId, knownRecords)) == null) { |
| try { |
| wait(); |
| } catch (InterruptedException e) { |
| throw new HyracksDataException(e); |
| } |
| } |
| return newRecords; |
| } |
| |
| /** |
| * Compares the records already known by the client for the given job's result set id with the records that the |
| * dataset directory service knows and if there are any newly discovered records returns a whole array with the |
| * new records filled in. |
| * This method has a very convoluted logic. Here is the explanation of how it works. |
| * If the ordering constraint has to be enforced, the method obtains the first null record in the known records in |
| * the order of the partitions. It always traverses the array in the first to last order! |
| * If known records array or the first element in that array is null in the but the record for that partition now |
| * known to the directory service, the method fills in that record in the array and returns the array back. |
| * However, if the first known null record is not a first element in the array, by induction, all the previous |
| * known records should be known already be known to client and none of the records for the partitions ahead is |
| * known by the client yet. So, we check if the client has reached the end of stream for the partition corresponding |
| * to the record before the first known null record, i.e. the last known non-null record. If not, we just return |
| * null because we cannot expose any new locations until the client reaches end of stream for the last known record. |
| * If the client has reached the end of stream record for the last known non-null record, we check if the next record |
| * is discovered by the dataset directory service and if so, we fill it in the records array and return it back or |
| * send null otherwise. |
| * If the ordering is not required, we are free to return any newly discovered records back, so we just check if |
| * arrays are equal and if they are not we send the entire new updated array. |
| * |
| * @param jobId |
| * - Id of the job for which the directory records should be retrieved. |
| * @param rsId |
| * - Id of the result set for which the directory records should be retrieved. |
| * @param knownRecords |
| * - An array of directory records that the client is already aware of. |
| * @return |
| * - Returns null if there aren't any newly discovered partitions enforcing the ordering constraint |
| * @throws HyracksDataException |
| * TODO(madhusudancs): Think about caching (and still be stateless) instead of this ugly O(n) iterations for |
| * every check. This already looks very expensive. |
| */ |
| private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords) |
| throws HyracksDataException { |
| DatasetJobRecord djr = jobResultLocations.get(jobId); |
| |
| if (djr == null) { |
| throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist"); |
| } |
| |
| if (djr.getStatus() == Status.FAILED) { |
| throw new HyracksDataException("Job failed."); |
| } |
| |
| ResultSetMetaData resultSetMetaData = djr.get(rsId); |
| if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) { |
| return null; |
| } |
| |
| boolean ordered = resultSetMetaData.getOrderedResult(); |
| DatasetDirectoryRecord[] records = resultSetMetaData.getRecords(); |
| /* If ordering is required, we should expose the dataset directory records only in the order, otherwise |
| * we can simply check if there are any newly discovered records and send the whole array back if there are. |
| */ |
| if (ordered) { |
| // Iterate over the known records and find the last record which is not null. |
| int i = 0; |
| for (i = 0; i < records.length; i++) { |
| if (knownRecords == null) { |
| if (records[0] != null) { |
| knownRecords = new DatasetDirectoryRecord[records.length]; |
| knownRecords[0] = records[0]; |
| return knownRecords; |
| } |
| return null; |
| } |
| if (knownRecords[i] == null) { |
| if ((i == 0 || knownRecords[i - 1].hasReachedReadEOS()) && records[i] != null) { |
| knownRecords[i] = records[i]; |
| return knownRecords; |
| } |
| return null; |
| } |
| } |
| } else { |
| if (!Arrays.equals(records, knownRecords)) { |
| return records; |
| } |
| } |
| return null; |
| } |
| } |