| /* |
| * Copyright 2009-2010 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package edu.uci.ics.hyracks.control.cc.partitions; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.logging.Logger; |
| |
| import org.apache.commons.lang3.tuple.Pair; |
| |
| import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId; |
| import edu.uci.ics.hyracks.api.partitions.PartitionId; |
| import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor; |
| import edu.uci.ics.hyracks.control.common.job.PartitionRequest; |
| import edu.uci.ics.hyracks.control.common.job.PartitionState; |
| |
| public class PartitionMatchMaker { |
| private static final Logger LOGGER = Logger.getLogger(PartitionMatchMaker.class.getName()); |
| |
| private final Map<PartitionId, List<PartitionDescriptor>> partitionDescriptors; |
| |
| private final Map<PartitionId, List<PartitionRequest>> partitionRequests; |
| |
| public PartitionMatchMaker() { |
| partitionDescriptors = new HashMap<PartitionId, List<PartitionDescriptor>>(); |
| partitionRequests = new HashMap<PartitionId, List<PartitionRequest>>(); |
| } |
| |
| public List<Pair<PartitionDescriptor, PartitionRequest>> registerPartitionDescriptor( |
| PartitionDescriptor partitionDescriptor) { |
| List<Pair<PartitionDescriptor, PartitionRequest>> matches = new ArrayList<Pair<PartitionDescriptor, PartitionRequest>>(); |
| PartitionId pid = partitionDescriptor.getPartitionId(); |
| boolean matched = false; |
| List<PartitionRequest> requests = partitionRequests.get(pid); |
| if (requests != null) { |
| Iterator<PartitionRequest> i = requests.iterator(); |
| while (i.hasNext()) { |
| PartitionRequest req = i.next(); |
| if (partitionDescriptor.getState().isAtLeast(req.getMinimumState())) { |
| matches.add(Pair.<PartitionDescriptor, PartitionRequest> of(partitionDescriptor, req)); |
| i.remove(); |
| matched = true; |
| if (!partitionDescriptor.isReusable()) { |
| break; |
| } |
| } |
| } |
| if (requests.isEmpty()) { |
| partitionRequests.remove(pid); |
| } |
| } |
| |
| if (!matched) { |
| List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid); |
| if (descriptors == null) { |
| descriptors = new ArrayList<PartitionDescriptor>(); |
| partitionDescriptors.put(pid, descriptors); |
| } |
| descriptors.add(partitionDescriptor); |
| } |
| |
| return matches; |
| } |
| |
| public Pair<PartitionDescriptor, PartitionRequest> matchPartitionRequest(PartitionRequest partitionRequest) { |
| Pair<PartitionDescriptor, PartitionRequest> match = null; |
| |
| PartitionId pid = partitionRequest.getPartitionId(); |
| |
| List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid); |
| if (descriptors != null) { |
| Iterator<PartitionDescriptor> i = descriptors.iterator(); |
| while (i.hasNext()) { |
| PartitionDescriptor descriptor = i.next(); |
| if (descriptor.getState().isAtLeast(partitionRequest.getMinimumState())) { |
| match = Pair.<PartitionDescriptor, PartitionRequest> of(descriptor, partitionRequest); |
| if (!descriptor.isReusable()) { |
| i.remove(); |
| } |
| break; |
| } |
| } |
| if (descriptors.isEmpty()) { |
| partitionDescriptors.remove(pid); |
| } |
| } |
| |
| if (match == null) { |
| List<PartitionRequest> requests = partitionRequests.get(pid); |
| if (requests == null) { |
| requests = new ArrayList<PartitionRequest>(); |
| partitionRequests.put(pid, requests); |
| } |
| requests.add(partitionRequest); |
| } |
| |
| return match; |
| } |
| |
| public PartitionState getMaximumAvailableState(PartitionId pid) { |
| List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid); |
| if (descriptors == null) { |
| return null; |
| } |
| for (PartitionDescriptor descriptor : descriptors) { |
| if (descriptor.getState() == PartitionState.COMMITTED) { |
| return PartitionState.COMMITTED; |
| } |
| } |
| return PartitionState.STARTED; |
| } |
| |
| private interface IEntryFilter<T> { |
| public boolean matches(T o); |
| } |
| |
| private static <T> void removeEntries(List<T> list, IEntryFilter<T> filter) { |
| Iterator<T> j = list.iterator(); |
| while (j.hasNext()) { |
| T o = j.next(); |
| if (filter.matches(o)) { |
| j.remove(); |
| } |
| } |
| } |
| |
| private static <T> void removeEntries(Map<PartitionId, List<T>> map, IEntryFilter<T> filter) { |
| Iterator<Map.Entry<PartitionId, List<T>>> i = map.entrySet().iterator(); |
| while (i.hasNext()) { |
| Map.Entry<PartitionId, List<T>> e = i.next(); |
| List<T> list = e.getValue(); |
| removeEntries(list, filter); |
| if (list.isEmpty()) { |
| i.remove(); |
| } |
| } |
| } |
| |
| public void notifyNodeFailures(final Set<String> deadNodes) { |
| removeEntries(partitionDescriptors, new IEntryFilter<PartitionDescriptor>() { |
| @Override |
| public boolean matches(PartitionDescriptor o) { |
| return deadNodes.contains(o.getNodeId()); |
| } |
| }); |
| removeEntries(partitionRequests, new IEntryFilter<PartitionRequest>() { |
| @Override |
| public boolean matches(PartitionRequest o) { |
| return deadNodes.contains(o.getNodeId()); |
| } |
| }); |
| } |
| |
| public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) { |
| LOGGER.info("Removing uncommitted partitions: " + partitionIds); |
| IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() { |
| @Override |
| public boolean matches(PartitionDescriptor o) { |
| return o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId()); |
| } |
| }; |
| for (PartitionId pid : partitionIds) { |
| List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid); |
| if (descriptors != null) { |
| removeEntries(descriptors, filter); |
| if (descriptors.isEmpty()) { |
| partitionDescriptors.remove(pid); |
| } |
| } |
| } |
| } |
| |
| public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) { |
| LOGGER.info("Removing partition requests: " + partitionIds); |
| IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() { |
| @Override |
| public boolean matches(PartitionRequest o) { |
| return taIds.contains(o.getRequestingTaskAttemptId()); |
| } |
| }; |
| for (PartitionId pid : partitionIds) { |
| List<PartitionRequest> requests = partitionRequests.get(pid); |
| if (requests != null) { |
| removeEntries(requests, filter); |
| if (requests.isEmpty()) { |
| partitionRequests.remove(pid); |
| } |
| } |
| } |
| } |
| } |