blob: 96529bd938355ef89f0a71a18515a5637b2c5dec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.config;
import java.util.List;
import java.util.Map;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.base.AbstractExchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Sender;
import org.apache.drill.exec.planner.fragment.ParallelizationInfo;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Multiplexing Exchange (MuxExchange) is used when results from multiple minor fragments belonging to the same
* major fragment running on a node need to be collected at one fragment on the same node before distributing the
* results further. This helps when the sender that is distributing the results has overhead that is proportional to
* the number of sender instances. An example of such sender is PartitionSender. Each instance of PartitionSender
* allocates "r" buffers where "r" is the number of receivers.
*
* Ex. Drillbit A is assigned 10 minor fragments belonging to the same major fragment. Each of these fragments
* has a PartitionSender instance which is sending data to 300 receivers. Each PartitionSender needs 300 buffers,
* so total of 10*300 buffers are needed. With MuxExchange, all 10 fragments send the data directly (without
* partitioning) to MuxExchange which uses the PartitionSender to partition the incoming data and distribute
* to receivers. MuxExchange has only one instance per Drillbit per major fragment which means only one instance of
* PartitionSender per Drillbit per major fragment. With MuxExchange total number of buffers used by PartitionSender
* for the 10 fragments is 300 instead of earlier number 10*300.
*/
public abstract class AbstractMuxExchange extends AbstractExchange {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractMuxExchange.class);
// Ephemeral info used when creating execution fragments.
protected Map<Integer, MinorFragmentEndpoint> senderToReceiverMapping;
protected ArrayListMultimap<Integer, MinorFragmentEndpoint> receiverToSenderMapping;
private boolean isSenderReceiverMappingCreated;
public AbstractMuxExchange(@JsonProperty("child") PhysicalOperator child) {
super(child);
}
@Override
public ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints) {
Preconditions.checkArgument(senderFragmentEndpoints != null && senderFragmentEndpoints.size() > 0,
"Sender fragment endpoint list should not be empty");
// We want to run one mux receiver per Drillbit endpoint.
// Identify the number of unique Drillbit endpoints in sender fragment endpoints.
List<DrillbitEndpoint> drillbitEndpoints = ImmutableSet.copyOf(senderFragmentEndpoints).asList();
List<EndpointAffinity> affinities = Lists.newArrayList();
for(DrillbitEndpoint ep : drillbitEndpoints) {
affinities.add(new EndpointAffinity(ep, Double.POSITIVE_INFINITY));
}
return ParallelizationInfo.create(affinities.size(), affinities.size(), affinities);
}
@Override
public Sender getSender(int minorFragmentId, PhysicalOperator child) {
createSenderReceiverMapping();
MinorFragmentEndpoint receiver = senderToReceiverMapping.get(minorFragmentId);
if (receiver == null) {
throw new IllegalStateException(String.format("Failed to find receiver for sender [%d]", minorFragmentId));
}
return new SingleSender(receiverMajorFragmentId, receiver.getId(), child, receiver.getEndpoint());
}
protected final List<MinorFragmentEndpoint> getSenders(int minorFragmentId) {
createSenderReceiverMapping();
List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
logger.debug("Minor fragment {}, receives data from following senders {}", minorFragmentId, senders);
if (senders == null || senders.size() <= 0) {
throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId));
}
return senders;
}
protected void createSenderReceiverMapping() {
if (isSenderReceiverMappingCreated) {
return;
}
senderToReceiverMapping = Maps.newHashMap();
receiverToSenderMapping = ArrayListMultimap.create();
// Find the list of sender fragment ids assigned to each Drillbit endpoint.
ArrayListMultimap<DrillbitEndpoint, Integer> endpointSenderList = ArrayListMultimap.create();
int senderFragmentId = 0;
for(DrillbitEndpoint senderLocation : senderLocations) {
endpointSenderList.put(senderLocation, senderFragmentId);
senderFragmentId++;
}
int receiverFragmentId = 0;
for(DrillbitEndpoint receiverLocation : receiverLocations) {
List<Integer> senderFragmentIds = endpointSenderList.get(receiverLocation);
for(Integer senderId : senderFragmentIds) {
senderToReceiverMapping.put(senderId, new MinorFragmentEndpoint(receiverFragmentId, receiverLocation));
receiverToSenderMapping.put(receiverFragmentId,
new MinorFragmentEndpoint(senderId, senderLocations.get(senderId)));
}
receiverFragmentId++;
}
isSenderReceiverMappingCreated = true;
}
}