blob: 2dcbb3e5e66546a4dd4494722ad7e632b259d0d2 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.cc.partitions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
public class PartitionMatchMaker {
private static final Logger LOGGER = Logger.getLogger(PartitionMatchMaker.class.getName());
private final Map<PartitionId, List<PartitionDescriptor>> partitionDescriptors;
private final Map<PartitionId, List<PartitionRequest>> partitionRequests;
public PartitionMatchMaker() {
partitionDescriptors = new HashMap<PartitionId, List<PartitionDescriptor>>();
partitionRequests = new HashMap<PartitionId, List<PartitionRequest>>();
}
public List<Pair<PartitionDescriptor, PartitionRequest>> registerPartitionDescriptor(
PartitionDescriptor partitionDescriptor) {
List<Pair<PartitionDescriptor, PartitionRequest>> matches = new ArrayList<Pair<PartitionDescriptor, PartitionRequest>>();
PartitionId pid = partitionDescriptor.getPartitionId();
boolean matched = false;
List<PartitionRequest> requests = partitionRequests.get(pid);
if (requests != null) {
Iterator<PartitionRequest> i = requests.iterator();
while (i.hasNext()) {
PartitionRequest req = i.next();
if (partitionDescriptor.getState().isAtLeast(req.getMinimumState())) {
matches.add(Pair.<PartitionDescriptor, PartitionRequest> of(partitionDescriptor, req));
i.remove();
matched = true;
if (!partitionDescriptor.isReusable()) {
break;
}
}
}
if (requests.isEmpty()) {
partitionRequests.remove(pid);
}
}
if (!matched) {
List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
if (descriptors == null) {
descriptors = new ArrayList<PartitionDescriptor>();
partitionDescriptors.put(pid, descriptors);
}
descriptors.add(partitionDescriptor);
}
return matches;
}
public Pair<PartitionDescriptor, PartitionRequest> matchPartitionRequest(PartitionRequest partitionRequest) {
Pair<PartitionDescriptor, PartitionRequest> match = null;
PartitionId pid = partitionRequest.getPartitionId();
List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
if (descriptors != null) {
Iterator<PartitionDescriptor> i = descriptors.iterator();
while (i.hasNext()) {
PartitionDescriptor descriptor = i.next();
if (descriptor.getState().isAtLeast(partitionRequest.getMinimumState())) {
match = Pair.<PartitionDescriptor, PartitionRequest> of(descriptor, partitionRequest);
if (!descriptor.isReusable()) {
i.remove();
}
break;
}
}
if (descriptors.isEmpty()) {
partitionDescriptors.remove(pid);
}
}
if (match == null) {
List<PartitionRequest> requests = partitionRequests.get(pid);
if (requests == null) {
requests = new ArrayList<PartitionRequest>();
partitionRequests.put(pid, requests);
}
requests.add(partitionRequest);
}
return match;
}
public PartitionState getMaximumAvailableState(PartitionId pid) {
List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
if (descriptors == null) {
return null;
}
for (PartitionDescriptor descriptor : descriptors) {
if (descriptor.getState() == PartitionState.COMMITTED) {
return PartitionState.COMMITTED;
}
}
return PartitionState.STARTED;
}
private interface IEntryFilter<T> {
public boolean matches(T o);
}
private static <T> void removeEntries(List<T> list, IEntryFilter<T> filter) {
Iterator<T> j = list.iterator();
while (j.hasNext()) {
T o = j.next();
if (filter.matches(o)) {
j.remove();
}
}
}
private static <T> void removeEntries(Map<PartitionId, List<T>> map, IEntryFilter<T> filter) {
Iterator<Map.Entry<PartitionId, List<T>>> i = map.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<PartitionId, List<T>> e = i.next();
List<T> list = e.getValue();
removeEntries(list, filter);
if (list.isEmpty()) {
i.remove();
}
}
}
public void notifyNodeFailures(final Set<String> deadNodes) {
removeEntries(partitionDescriptors, new IEntryFilter<PartitionDescriptor>() {
@Override
public boolean matches(PartitionDescriptor o) {
return deadNodes.contains(o.getNodeId());
}
});
removeEntries(partitionRequests, new IEntryFilter<PartitionRequest>() {
@Override
public boolean matches(PartitionRequest o) {
return deadNodes.contains(o.getNodeId());
}
});
}
public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
LOGGER.info("Removing uncommitted partitions: " + partitionIds);
IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() {
@Override
public boolean matches(PartitionDescriptor o) {
return o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId());
}
};
for (PartitionId pid : partitionIds) {
List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
if (descriptors != null) {
removeEntries(descriptors, filter);
if (descriptors.isEmpty()) {
partitionDescriptors.remove(pid);
}
}
}
}
public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
LOGGER.info("Removing partition requests: " + partitionIds);
IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() {
@Override
public boolean matches(PartitionRequest o) {
return taIds.contains(o.getRequestingTaskAttemptId());
}
};
for (PartitionId pid : partitionIds) {
List<PartitionRequest> requests = partitionRequests.get(pid);
if (requests != null) {
removeEntries(requests, filter);
if (requests.isEmpty()) {
partitionRequests.remove(pid);
}
}
}
}
}