| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.querymaster; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.tajo.ExecutionBlockId; |
| import org.apache.tajo.ResourceProtos.FetchProto; |
| import org.apache.tajo.SessionVars; |
| import org.apache.tajo.algebra.JoinType; |
| import org.apache.tajo.annotation.NotNull; |
| import org.apache.tajo.catalog.*; |
| import org.apache.tajo.catalog.statistics.StatisticsUtil; |
| import org.apache.tajo.catalog.statistics.TableStats; |
| import org.apache.tajo.conf.TajoConf.ConfVars; |
| import org.apache.tajo.engine.planner.PhysicalPlannerImpl; |
| import org.apache.tajo.engine.planner.RangePartitionAlgorithm; |
| import org.apache.tajo.engine.planner.UniformRangePartition; |
| import org.apache.tajo.engine.planner.enforce.Enforcer; |
| import org.apache.tajo.engine.planner.global.DataChannel; |
| import org.apache.tajo.engine.planner.global.ExecutionBlock; |
| import org.apache.tajo.engine.planner.global.MasterPlan; |
| import org.apache.tajo.engine.planner.global.rewriter.rules.GlobalPlanRewriteUtil; |
| import org.apache.tajo.engine.utils.TupleUtil; |
| import org.apache.tajo.exception.TajoException; |
| import org.apache.tajo.exception.TajoInternalError; |
| import org.apache.tajo.exception.UndefinedTableException; |
| import org.apache.tajo.plan.logical.*; |
| import org.apache.tajo.plan.logical.SortNode.SortPurpose; |
| import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; |
| import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; |
| import org.apache.tajo.plan.util.PlannerUtil; |
| import org.apache.tajo.querymaster.Task.IntermediateEntry; |
| import org.apache.tajo.querymaster.Task.PullHost; |
| import org.apache.tajo.storage.*; |
| import org.apache.tajo.storage.fragment.FileFragment; |
| import org.apache.tajo.storage.fragment.Fragment; |
| import org.apache.tajo.unit.StorageUnit; |
| import org.apache.tajo.util.Pair; |
| import org.apache.tajo.util.TUtil; |
| import org.apache.tajo.util.TajoIdUtils; |
| import org.apache.tajo.worker.FetchImpl; |
| import org.apache.tajo.worker.FetchImpl.RangeParam; |
| |
| import java.io.IOException; |
| import java.io.UnsupportedEncodingException; |
| import java.math.BigInteger; |
| import java.net.URI; |
| import java.net.URLEncoder; |
| import java.util.*; |
| import java.util.Map.Entry; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| |
| import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; |
| import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*; |
| |
| /** |
| * Repartitioner creates non-leaf tasks and shuffles intermediate data. |
| * It supports two repartition methods, such as hash and range repartition. |
| */ |
| public class Repartitioner { |
| private static final Log LOG = LogFactory.getLog(Repartitioner.class); |
| |
| private final static String UNKNOWN_HOST = "unknown"; |
| |
| public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, Stage stage) |
| throws IOException, TajoException { |
| ExecutionBlock execBlock = stage.getBlock(); |
| QueryMasterTask.QueryMasterTaskContext masterContext = stage.getContext(); |
| |
| ScanNode[] scans = execBlock.getScanNodes(); |
| Fragment[] fragments = new Fragment[scans.length]; |
| long[] stats = new long[scans.length]; |
| |
| // initialize variables from the child operators |
| for (int i = 0; i < scans.length; i++) { |
| TableDesc tableDesc = masterContext.getTableDesc(scans[i]); |
| |
| if (tableDesc == null) { // if it is a real table stored on storage |
| if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) { |
| for (Map.Entry<ExecutionBlockId, ExecutionBlockId> unionScanEntry: execBlock.getUnionScanMap().entrySet()) { |
| ExecutionBlockId originScanEbId = unionScanEntry.getKey(); |
| stats[i] += masterContext.getStage(originScanEbId).getResultStats().getNumBytes(); |
| } |
| } else { |
| ExecutionBlockId scanEBId = TajoIdUtils.createExecutionBlockId(scans[i].getTableName()); |
| stats[i] = masterContext.getStage(scanEBId).getResultStats().getNumBytes(); |
| } |
| |
| // TODO - We should remove dummy fragment usages |
| fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path("/dummy"), 0, 0, |
| new String[]{UNKNOWN_HOST}); |
| |
| } else { |
| |
| stats[i] = GlobalPlanRewriteUtil.computeDescendentVolume(scans[i]); |
| |
| // if table has no data, tablespace will return empty FileFragment. |
| // So, we need to handle FileFragment by its size. |
| // If we don't check its size, it can cause IndexOutOfBoundsException. |
| Tablespace space = TablespaceManager.get(tableDesc.getUri()); |
| List<Fragment> fileFragments = space.getSplits(scans[i].getCanonicalName(), tableDesc, null); |
| if (fileFragments.size() > 0) { |
| fragments[i] = fileFragments.get(0); |
| } else { |
| fragments[i] = new FileFragment(scans[i].getCanonicalName(), |
| new Path(tableDesc.getUri()), 0, 0, new String[]{UNKNOWN_HOST}); |
| } |
| } |
| } |
| |
| // If one of inner join tables has no input data, it means that this execution block has no result row. |
| JoinNode joinNode = PlannerUtil.findMostBottomNode(execBlock.getPlan(), NodeType.JOIN); |
| if (joinNode != null) { |
| if ( (joinNode.getJoinType() == JoinType.INNER)) { |
| LogicalNode leftNode = joinNode.getLeftChild(); |
| LogicalNode rightNode = joinNode.getRightChild(); |
| for (int i = 0; i < stats.length; i++) { |
| if (scans[i].getPID() == leftNode.getPID() || scans[i].getPID() == rightNode.getPID()) { |
| if (stats[i] == 0) { |
| LOG.info(scans[i] + " 's input data is zero. Inner join's result is empty."); |
| return; |
| } |
| } |
| } |
| } |
| } |
| |
| // If node is outer join and a preserved relation is empty, it should return zero rows. |
| joinNode = PlannerUtil.findTopNode(execBlock.getPlan(), NodeType.JOIN); |
| if (joinNode != null) { |
| // If all stats are zero, return |
| boolean isEmptyAllJoinTables = true; |
| for (long stat : stats) { |
| if (stat > 0) { |
| isEmptyAllJoinTables = false; |
| break; |
| } |
| } |
| if (isEmptyAllJoinTables) { |
| LOG.info("All input join tables are empty."); |
| return; |
| } |
| |
| // find left top scan node |
| ScanNode leftScanNode = PlannerUtil.findTopNode(joinNode.getLeftChild(), NodeType.SCAN); |
| ScanNode rightScanNode = PlannerUtil.findTopNode(joinNode.getRightChild(), NodeType.SCAN); |
| |
| long leftStats = -1; |
| long rightStats = -1; |
| if (stats.length == 2) { |
| for (int i = 0; i < stats.length; i++) { |
| if (scans[i].equals(leftScanNode)) { |
| leftStats = stats[i]; |
| } else if (scans[i].equals(rightScanNode)) { |
| rightStats = stats[i]; |
| } |
| } |
| if (joinNode.getJoinType() == JoinType.LEFT_OUTER) { |
| if (leftStats == 0) { |
| return; |
| } |
| } |
| if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) { |
| if (rightStats == 0) { |
| return; |
| } |
| } |
| } |
| } |
| |
| // Assigning either fragments or fetch urls to query units |
| if (execBlock.hasBroadcastRelation()) { // If some relations of this EB are broadcasted |
| boolean hasNonLeafNode = false; |
| List<Integer> largeScanIndexList = new ArrayList<>(); |
| List<Integer> broadcastIndexList = new ArrayList<>(); |
| String nonLeafScanNames = ""; |
| String namePrefix = ""; |
| long maxStats = Long.MIN_VALUE; |
| int maxStatsScanIdx = -1; |
| StringBuilder nonLeafScanNamesBuilder = new StringBuilder(); |
| |
| String intermediateDataFormat = schedulerContext.getMasterContext().getConf().getVar(ConfVars.SHUFFLE_FILE_FORMAT); |
| for (int i = 0; i < scans.length; i++) { |
| |
| if (scans[i].getTableDesc().getMeta().getDataFormat().equalsIgnoreCase(intermediateDataFormat)) { |
| // Intermediate data scan |
| hasNonLeafNode = true; |
| largeScanIndexList.add(i); |
| nonLeafScanNamesBuilder.append(namePrefix).append(scans[i].getCanonicalName()); |
| namePrefix = ","; |
| } |
| if (execBlock.isBroadcastRelation(scans[i])) { |
| broadcastIndexList.add(i); |
| } else { |
| // finding largest table. |
| if (stats[i] > 0 && stats[i] > maxStats) { |
| maxStats = stats[i]; |
| maxStatsScanIdx = i; |
| } |
| } |
| } |
| nonLeafScanNames = nonLeafScanNamesBuilder.toString(); |
| if (maxStatsScanIdx == -1) { |
| maxStatsScanIdx = 0; |
| } |
| |
| if (!hasNonLeafNode) { |
| if (largeScanIndexList.size() > 1) { |
| StringBuilder largeTableNamesBuilder = new StringBuilder(); |
| for (Integer eachId : largeScanIndexList) { |
| largeTableNamesBuilder.append(scans[eachId].getTableName()).append(','); |
| } |
| throw new IOException("Broadcast join with leaf node should have only one large table, " + |
| "but " + largeScanIndexList.size() + ", tables=" + largeTableNamesBuilder.toString()); |
| } |
| int baseScanIdx = largeScanIndexList.isEmpty() ? maxStatsScanIdx : largeScanIndexList.get(0); |
| LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, base_table=%s, base_volume=%d", |
| scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx])); |
| scheduleLeafTasksWithBroadcastTable(schedulerContext, stage, baseScanIdx, fragments); |
| } else { |
| if (largeScanIndexList.size() > 2) { |
| throw new IOException("Symmetric Repartition Join should have two scan node, but " + nonLeafScanNames); |
| } |
| |
| //select intermediate scan and stats |
| long[] intermediateScanStats = new long[largeScanIndexList.size()]; |
| Fragment[] intermediateFragments = new Fragment[largeScanIndexList.size()]; |
| int index = 0; |
| for (Integer eachIdx : largeScanIndexList) { |
| intermediateScanStats[index] = stats[eachIdx]; |
| intermediateFragments[index++] = fragments[eachIdx]; |
| } |
| Fragment[] broadcastFragments = new Fragment[broadcastIndexList.size()]; |
| ScanNode[] broadcastScans = new ScanNode[broadcastIndexList.size()]; |
| long[] broadcastStats = new long[broadcastIndexList.size()]; |
| index = 0; |
| for (Integer eachIdx : broadcastIndexList) { |
| scans[eachIdx].setBroadcastTable(true); |
| broadcastScans[index] = scans[eachIdx]; |
| broadcastStats[index] = stats[eachIdx]; |
| broadcastFragments[index] = fragments[eachIdx]; |
| index++; |
| } |
| LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, join_node=%s", nonLeafScanNames)); |
| scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, stage, |
| intermediateScanStats, intermediateFragments, broadcastScans, broadcastStats, broadcastFragments); |
| } |
| } else { |
| LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join"); |
| scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, stage, stats, fragments, null, null, null); |
| } |
| } |
| |
| /** |
| * Scheduling in tech case of Symmetric Repartition Join |
| * @param masterContext |
| * @param schedulerContext |
| * @param stage |
| * @param stats |
| * @param fragments |
| * @throws IOException |
| */ |
| private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMasterTaskContext masterContext, |
| TaskSchedulerContext schedulerContext, |
| Stage stage, |
| long[] stats, |
| Fragment[] fragments, |
| ScanNode[] broadcastScans, |
| long[] broadcastStats, |
| Fragment[] broadcastFragments) throws IOException, TajoException { |
| MasterPlan masterPlan = stage.getMasterPlan(); |
| ExecutionBlock execBlock = stage.getBlock(); |
| // The hash map is modeling as follows: |
| // <Part Id, <EbId, List<Intermediate Data>>> |
| Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries = |
| new HashMap<>(); |
| |
| // Grouping IntermediateData by a partition key and a table name |
| List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId()); |
| |
| // In the case of join with union, there is one ScanNode for union. |
| Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = execBlock.getUnionScanMap(); |
| for (ExecutionBlock childBlock : childBlocks) { |
| ExecutionBlockId scanEbId = unionScanMap.get(childBlock.getId()); |
| if (scanEbId == null) { |
| scanEbId = childBlock.getId(); |
| } |
| Stage childExecSM = stage.getContext().getStage(childBlock.getId()); |
| |
| if (childExecSM.getHashShuffleIntermediateEntries() != null && |
| !childExecSM.getHashShuffleIntermediateEntries().isEmpty()) { |
| for (IntermediateEntry intermediateEntry: childExecSM.getHashShuffleIntermediateEntries()) { |
| intermediateEntry.setEbId(childBlock.getId()); |
| if (hashEntries.containsKey(intermediateEntry.getPartId())) { |
| Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm = |
| hashEntries.get(intermediateEntry.getPartId()); |
| |
| if (tbNameToInterm.containsKey(scanEbId)) { |
| tbNameToInterm.get(scanEbId).add(intermediateEntry); |
| } else { |
| tbNameToInterm.put(scanEbId, Lists.newArrayList(intermediateEntry)); |
| } |
| } else { |
| Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm = |
| new HashMap<>(); |
| tbNameToInterm.put(scanEbId, Lists.newArrayList(intermediateEntry)); |
| hashEntries.put(intermediateEntry.getPartId(), tbNameToInterm); |
| } |
| } |
| } else { |
| //if no intermidatedata(empty table), make empty entry |
| int emptyPartitionId = 0; |
| if (hashEntries.containsKey(emptyPartitionId)) { |
| Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm = hashEntries.get(emptyPartitionId); |
| if (tbNameToInterm.containsKey(scanEbId)) |
| tbNameToInterm.get(scanEbId).addAll(new ArrayList<>()); |
| else |
| tbNameToInterm.put(scanEbId, new ArrayList<>()); |
| } else { |
| Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm = |
| new HashMap<>(); |
| tbNameToInterm.put(scanEbId, new ArrayList<>()); |
| hashEntries.put(emptyPartitionId, tbNameToInterm); |
| } |
| } |
| } |
| |
| // hashEntries can be zero if there are no input data. |
| // In the case, it will cause the zero divided exception. |
| // it avoids this problem. |
| long leftStats = stats[0]; |
| long rightStats = stats.length == 2 ? stats[1] : broadcastStats[0]; |
| int[] avgSize = new int[2]; |
| avgSize[0] = hashEntries.size() == 0 ? 0 : (int) (leftStats / hashEntries.size()); |
| avgSize[1] = hashEntries.size() == 0 ? 0 : (int) (stats.length == 2 ? (rightStats / hashEntries.size()) : rightStats); |
| int bothFetchSize = avgSize[0] + avgSize[1]; |
| |
| // Getting the desire number of join tasks according to the volumn |
| // of a larger table |
| long largerStat = leftStats >= rightStats ? leftStats : rightStats; |
| int desireJoinTaskVolumn = stage.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE); |
| |
| // calculate the number of tasks according to the data size |
| int mb = (int) Math.ceil((double) largerStat / 1048576); |
| LOG.info("Larger intermediate data is approximately " + mb + " MB"); |
| // determine the number of task per 64MB |
| int maxTaskNum = (int) Math.ceil((double) mb / desireJoinTaskVolumn); |
| LOG.info("The calculated number of tasks is " + maxTaskNum); |
| LOG.info("The number of total shuffle keys is " + hashEntries.size()); |
| // the number of join tasks cannot be larger than the number of |
| // distinct partition ids. |
| int joinTaskNum = Math.min(maxTaskNum, hashEntries.size()); |
| LOG.info("The determined number of join tasks is " + joinTaskNum); |
| |
| List<Fragment> rightFragments = new ArrayList<>(); |
| if (fragments.length == 2) { |
| rightFragments.add(fragments[1]); |
| } |
| |
| if (broadcastFragments != null) { |
| //In this phase a ScanNode has a single fragment. |
| //If there are more than one data files, that files should be added to fragments or partition path |
| |
| for (ScanNode eachScan: broadcastScans) { |
| |
| Path[] partitionScanPaths = null; |
| TableDesc tableDesc = masterContext.getTableDesc(eachScan); |
| Tablespace space = TablespaceManager.get(tableDesc.getUri()); |
| |
| if (eachScan.getType() == NodeType.PARTITIONS_SCAN) { |
| |
| PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan; |
| partitionScanPaths = partitionScan.getInputPaths(); |
| // set null to inputPaths in getFragmentsFromPartitionedTable() |
| getFragmentsFromPartitionedTable((FileTablespace) space, eachScan, tableDesc); |
| partitionScan.setInputPaths(partitionScanPaths); |
| |
| } else { |
| |
| Collection<Fragment> scanFragments = |
| space.getSplits(eachScan.getCanonicalName(), tableDesc, eachScan.getQual()); |
| if (scanFragments != null) { |
| rightFragments.addAll(scanFragments); |
| } |
| |
| } |
| } |
| } |
| Stage.scheduleFragment(stage, fragments[0], rightFragments); |
| |
| // Assign partitions to tasks in a round robin manner. |
| for (Entry<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> entry |
| : hashEntries.entrySet()) { |
| addJoinShuffle(stage, entry.getKey(), entry.getValue()); |
| } |
| |
| schedulerContext.setTaskSize((int) Math.ceil((double) bothFetchSize / joinTaskNum)); |
| schedulerContext.setEstimatedTaskNum(joinTaskNum); |
| } |
| |
| /** |
| * merge intermediate entry by ebid, pullhost |
| * @param hashEntries |
| * @return |
| */ |
| public static Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> mergeIntermediateByPullHost( |
| Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries) { |
| Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> mergedHashEntries = |
| new HashMap<>(); |
| |
| for(Entry<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> entry: hashEntries.entrySet()) { |
| Integer partId = entry.getKey(); |
| for (Entry<ExecutionBlockId, List<IntermediateEntry>> partEntry: entry.getValue().entrySet()) { |
| List<IntermediateEntry> intermediateList = partEntry.getValue(); |
| if (intermediateList == null || intermediateList.isEmpty()) { |
| continue; |
| } |
| ExecutionBlockId ebId = partEntry.getKey(); |
| // EBID + PullHost -> IntermediateEntry |
| // In the case of union partEntry.getKey() return's delegated EBID. |
| // Intermediate entries are merged by real EBID. |
| Map<String, IntermediateEntry> ebMerged = new HashMap<>(); |
| |
| for (IntermediateEntry eachIntermediate: intermediateList) { |
| String ebMergedKey = eachIntermediate.getEbId().toString() + eachIntermediate.getPullHost().getPullAddress(); |
| IntermediateEntry intermediateEntryPerPullHost = ebMerged.get(ebMergedKey); |
| if (intermediateEntryPerPullHost == null) { |
| intermediateEntryPerPullHost = new IntermediateEntry(-1, -1, partId, eachIntermediate.getPullHost()); |
| intermediateEntryPerPullHost.setEbId(eachIntermediate.getEbId()); |
| ebMerged.put(ebMergedKey, intermediateEntryPerPullHost); |
| } |
| intermediateEntryPerPullHost.setVolume(intermediateEntryPerPullHost.getVolume() + eachIntermediate.getVolume()); |
| } |
| |
| List<IntermediateEntry> ebIntermediateEntries = new ArrayList<>(ebMerged.values()); |
| |
| Map<ExecutionBlockId, List<IntermediateEntry>> mergedPartEntries = mergedHashEntries.get(partId); |
| if (mergedPartEntries == null) { |
| mergedPartEntries = new HashMap<>(); |
| mergedHashEntries.put(partId, mergedPartEntries); |
| } |
| mergedPartEntries.put(ebId, ebIntermediateEntries); |
| } |
| } |
| return mergedHashEntries; |
| } |
| |
| /** |
| * It creates a number of fragments for all partitions. |
| */ |
| public static List<Fragment> getFragmentsFromPartitionedTable(Tablespace tsHandler, |
| ScanNode scan, |
| TableDesc table) throws IOException { |
| Preconditions.checkArgument(tsHandler instanceof FileTablespace, "tsHandler must be FileTablespace"); |
| if (!(scan instanceof PartitionedTableScanNode)) { |
| throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type."); |
| } |
| List<Fragment> fragments = Lists.newArrayList(); |
| PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan; |
| fragments.addAll(((FileTablespace) tsHandler).getSplits( |
| scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths())); |
| partitionsScan.setInputPaths(null); |
| return fragments; |
| } |
| |
| private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, Stage stage, |
| int baseScanId, Fragment[] fragments) |
| throws IOException, TajoException { |
| ExecutionBlock execBlock = stage.getBlock(); |
| ScanNode[] scans = execBlock.getScanNodes(); |
| |
| for (int i = 0; i < scans.length; i++) { |
| if (i != baseScanId) { |
| scans[i].setBroadcastTable(true); |
| } |
| } |
| |
| // Large table(baseScan) |
| // -> add all fragment to baseFragments |
| // -> each fragment is assigned to a Task by DefaultTaskScheduler.handle() |
| // Broadcast table |
| // all fragments or paths assigned every Large table's scan task. |
| // -> PARTITIONS_SCAN |
| // . add all partition paths to node's inputPaths variable |
| // -> SCAN |
| // . add all fragments to broadcastFragments |
| Collection<Fragment> baseFragments = null; |
| List<Fragment> broadcastFragments = new ArrayList<>(); |
| for (int i = 0; i < scans.length; i++) { |
| ScanNode scan = scans[i]; |
| TableDesc desc = stage.getContext().getTableDesc(scan); |
| |
| Collection<Fragment> scanFragments; |
| Path[] partitionScanPaths = null; |
| |
| |
| Tablespace space = TablespaceManager.get(desc.getUri()); |
| |
| if (scan.getType() == NodeType.PARTITIONS_SCAN) { |
| PartitionedTableScanNode partitionScan = (PartitionedTableScanNode) scan; |
| partitionScanPaths = partitionScan.getInputPaths(); |
| // set null to inputPaths in getFragmentsFromPartitionedTable() |
| scanFragments = getFragmentsFromPartitionedTable(space, scan, desc); |
| } else { |
| scanFragments = space.getSplits(scan.getCanonicalName(), desc, scan.getQual()); |
| } |
| |
| if (scanFragments != null) { |
| if (i == baseScanId) { |
| baseFragments = scanFragments; |
| } else { |
| if (scan.getType() == NodeType.PARTITIONS_SCAN) { |
| PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan; |
| // PhisicalPlanner make PartitionMergeScanExec when table is boradcast table and inputpaths is not empty |
| partitionScan.setInputPaths(partitionScanPaths); |
| } else { |
| broadcastFragments.addAll(scanFragments); |
| } |
| } |
| } |
| } |
| |
| if (baseFragments == null) { |
| throw new IOException("No fragments for " + scans[baseScanId].getTableName()); |
| } |
| |
| Stage.scheduleFragments(stage, baseFragments, broadcastFragments); |
| schedulerContext.setEstimatedTaskNum(baseFragments.size()); |
| } |
| |
| private static void addJoinShuffle(Stage stage, int partitionId, |
| Map<ExecutionBlockId, List<IntermediateEntry>> grouppedPartitions) { |
| Map<String, List<FetchProto>> fetches = new HashMap<>(); |
| for (ExecutionBlock execBlock : stage.getMasterPlan().getChilds(stage.getId())) { |
| if (grouppedPartitions.containsKey(execBlock.getId())) { |
| String name = execBlock.getId().toString(); |
| List<FetchProto> requests = mergeShuffleRequest(name, partitionId, HASH_SHUFFLE, |
| grouppedPartitions.get(execBlock.getId())); |
| fetches.put(name, requests); |
| } |
| } |
| |
| if (fetches.isEmpty()) { |
| LOG.info(stage.getId() + "'s " + partitionId + " partition has empty result."); |
| return; |
| } |
| Stage.scheduleFetches(stage, fetches); |
| } |
| |
| /** |
| * This method merges the partition request associated with the pullserver's address. |
| * It reduces the number of TCP connections. |
| * |
| * @return key: pullserver's address, value: a list of requests |
| */ |
| private static List<FetchProto> mergeShuffleRequest(final String fetchName, |
| final int partitionId, |
| final ShuffleType type, |
| final List<IntermediateEntry> partitions) { |
| // ebId + pullhost -> FetchImmpl |
| Map<String, FetchImpl> mergedPartitions = new HashMap<>(); |
| |
| for (IntermediateEntry partition : partitions) { |
| String mergedKey = partition.getEbId().toString() + "," + partition.getPullHost(); |
| |
| if (mergedPartitions.containsKey(mergedKey)) { |
| FetchImpl fetch = mergedPartitions.get(mergedKey); |
| fetch.addPart(partition.getTaskId(), partition.getAttemptId()); |
| } else { |
| // In some cases like union each IntermediateEntry has different EBID. |
| FetchImpl fetch = new FetchImpl(fetchName, partition.getPullHost(), type, partition.getEbId(), partitionId); |
| fetch.addPart(partition.getTaskId(), partition.getAttemptId()); |
| mergedPartitions.put(mergedKey, fetch); |
| } |
| } |
| |
| return mergedPartitions.values().stream() |
| .map(fetch -> fetch.getProto()) |
| .collect(Collectors.toList()); |
| } |
| |
| public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext, |
| MasterPlan masterPlan, Stage stage, int maxNum) |
| throws IOException { |
| DataChannel channel = masterPlan.getIncomingChannels(stage.getBlock().getId()).get(0); |
| if (channel.isHashShuffle()) { |
| scheduleHashShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum); |
| } else if (channel.isRangeShuffle()) { |
| scheduleRangeShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum); |
| } else { |
| throw new TajoInternalError("Cannot support partition type"); |
| } |
| } |
| |
| private static TableStats computeChildBlocksStats(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, |
| ExecutionBlockId parentBlockId) { |
| List<TableStats> tableStatses = new ArrayList<>(); |
| List<ExecutionBlock> childBlocks = masterPlan.getChilds(parentBlockId); |
| for (ExecutionBlock childBlock : childBlocks) { |
| Stage childStage = context.getStage(childBlock.getId()); |
| tableStatses.add(childStage.getResultStats()); |
| } |
| return StatisticsUtil.aggregateTableStat(tableStatses); |
| } |
| |
| public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan, |
| Stage stage, DataChannel channel, int maxNum) |
| throws IOException { |
| ExecutionBlock execBlock = stage.getBlock(); |
| ScanNode scan = execBlock.getScanNodes()[0]; |
| |
| ExecutionBlock sampleChildBlock = masterPlan.getChild(stage.getId(), 0); |
| SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT); |
| SortSpec [] sortSpecs = sortNode.getSortKeys(); |
| Schema sortSchema = SchemaFactory.newV1(channel.getShuffleKeys()); |
| |
| TupleRange[] ranges; |
| int determinedTaskNum; |
| |
| // calculate the number of maximum query ranges |
| TableStats totalStat = computeChildBlocksStats(stage.getContext(), masterPlan, stage.getId()); |
| |
| // If there is an empty table in inner join, it should return zero rows. |
| if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0) { |
| return; |
| } |
| TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), false); |
| |
| if (sortNode.getSortPurpose() == SortPurpose.STORAGE_SPECIFIED) { |
| String dataFormat = PlannerUtil.getDataFormat(masterPlan.getLogicalPlan()); |
| CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); |
| LogicalRootNode rootNode = masterPlan.getLogicalPlan().getRootBlock().getRoot(); |
| TableDesc tableDesc = null; |
| try { |
| tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); |
| } catch (UndefinedTableException e) { |
| throw new IOException("Can't get table meta data from catalog: " + |
| PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan())); |
| } |
| |
| Tablespace space = TablespaceManager.getAnyByScheme(dataFormat).get(); |
| ranges = space.getInsertSortRanges( |
| stage.getContext().getQueryContext(), |
| tableDesc, |
| sortNode.getInSchema(), |
| sortSpecs, |
| mergedRange); |
| |
| determinedTaskNum = ranges.length; |
| } else { |
| RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs); |
| BigInteger card = partitioner.getTotalCardinality(); |
| |
| // if the number of the range cardinality is less than the desired number of tasks, |
| // we set the the number of tasks to the number of range cardinality. |
| if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) { |
| LOG.info(stage.getId() + ", The range cardinality (" + card |
| + ") is less then the desired number of tasks (" + maxNum + ")"); |
| determinedTaskNum = card.intValue(); |
| } else { |
| determinedTaskNum = maxNum; |
| } |
| |
| LOG.info(stage.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum + |
| " sub ranges (total units: " + determinedTaskNum + ")"); |
| ranges = partitioner.partition(determinedTaskNum); |
| if (ranges == null) { |
| throw new NullPointerException("ranges is null on " + stage.getId() + " stage."); |
| } |
| |
| if (ranges.length == 0) { |
| LOG.warn(stage.getId() + " no range infos."); |
| } |
| |
| TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges); |
| if (LOG.isDebugEnabled()) { |
| for (TupleRange eachRange : ranges) { |
| LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); |
| } |
| } |
| } |
| |
| // TODO - We should remove dummy fragment. |
| FileFragment dummyFragment = new FileFragment(scan.getTableName(), new Path("/dummy"), 0, 0, |
| new String[]{UNKNOWN_HOST}); |
| Stage.scheduleFragment(stage, dummyFragment); |
| |
| Map<Pair<PullHost, ExecutionBlockId>, FetchImpl> fetches = new HashMap<>(); |
| List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId()); |
| for (ExecutionBlock childBlock : childBlocks) { |
| Stage childExecSM = stage.getContext().getStage(childBlock.getId()); |
| for (Task qu : childExecSM.getTasks()) { |
| for (IntermediateEntry p : qu.getIntermediateData()) { |
| Pair<PullHost, ExecutionBlockId> key = new Pair<>(p.getPullHost(), childBlock.getId()); |
| if (fetches.containsKey(key)) { |
| fetches.get(key).addPart(p.getTaskId(), p.getAttemptId()); |
| } else { |
| FetchImpl fetch = new FetchImpl(scan.getTableName(), p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0); |
| fetch.addPart(p.getTaskId(), p.getAttemptId()); |
| fetches.put(key, fetch); |
| } |
| } |
| } |
| } |
| |
| SortedMap<TupleRange, Collection<FetchProto>> map; |
| map = new TreeMap<>(); |
| |
| Set<FetchProto> fetchSet; |
| RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema); |
| for (int i = 0; i < ranges.length; i++) { |
| fetchSet = new HashSet<>(); |
| RangeParam rangeParam = new RangeParam(ranges[i], i == (ranges.length - 1), encoder); |
| for (FetchImpl fetch : fetches.values()) { |
| FetchImpl copy = null; |
| try { |
| copy = fetch.clone(); |
| } catch (CloneNotSupportedException e) { |
| throw new RuntimeException(e); |
| } |
| copy.setRangeParams(rangeParam); |
| fetchSet.add(copy.getProto()); |
| } |
| |
| map.put(ranges[i], fetchSet); |
| } |
| |
| scheduleFetchesByRoundRobin(stage, map, scan.getTableName(), determinedTaskNum); |
| |
| schedulerContext.setEstimatedTaskNum(determinedTaskNum); |
| } |
| |
| public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchProto>> partitions, |
| String tableName, int num) { |
| int i; |
| Map<String, List<FetchProto>>[] fetchesArray = new Map[num]; |
| for (i = 0; i < num; i++) { |
| fetchesArray[i] = new HashMap<>(); |
| } |
| i = 0; |
| for (Entry<?, Collection<FetchProto>> entry : partitions.entrySet()) { |
| Collection<FetchProto> value = entry.getValue(); |
| TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value); |
| if (i == num) i = 0; |
| } |
| for (Map<String, List<FetchProto>> eachFetches : fetchesArray) { |
| Stage.scheduleFetches(stage, eachFetches); |
| } |
| } |
| |
| @VisibleForTesting |
| public static class FetchGroupMeta { |
| long totalVolume; |
| List<FetchImpl> fetchUrls; |
| |
| public FetchGroupMeta(long volume, FetchImpl fetchUrls) { |
| this.totalVolume = volume; |
| this.fetchUrls = Lists.newArrayList(fetchUrls); |
| } |
| |
| public FetchGroupMeta addFetche(FetchImpl fetches) { |
| this.fetchUrls.add(fetches); |
| return this; |
| } |
| |
| public void increaseVolume(long volume) { |
| this.totalVolume += volume; |
| } |
| |
| public long getVolume() { |
| return totalVolume; |
| } |
| |
| public List<FetchProto> getFetchProtos() { |
| return fetchUrls.stream().map(fetch -> fetch.getProto()).collect(Collectors.toList()); |
| } |
| |
| } |
| |
| public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan, |
| Stage stage, DataChannel channel, |
| int maxNum) throws IOException { |
| ExecutionBlock execBlock = stage.getBlock(); |
| ScanNode scan = execBlock.getScanNodes()[0]; |
| |
| // TODO - We should remove dummy fragment usages |
| Fragment frag = new FileFragment(scan.getCanonicalName(), new Path("/dummy"), 0, 0, new String[]{UNKNOWN_HOST}); |
| List<Fragment> fragments = new ArrayList<>(); |
| fragments.add(frag); |
| Stage.scheduleFragments(stage, fragments); |
| |
| Map<Integer, FetchGroupMeta> finalFetches = new HashMap<>(); |
| Map<ExecutionBlockId, List<IntermediateEntry>> intermediates = new HashMap<>(); |
| |
| for (ExecutionBlock block : masterPlan.getChilds(execBlock)) { |
| List<IntermediateEntry> partitions = new ArrayList<>(); |
| partitions.addAll(stage.getContext().getStage(block.getId()).getHashShuffleIntermediateEntries()); |
| |
| // In scattered hash shuffle, Collecting each IntermediateEntry |
| if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { |
| if (intermediates.containsKey(block.getId())) { |
| intermediates.get(block.getId()).addAll(partitions); |
| } else { |
| intermediates.put(block.getId(), partitions); |
| } |
| } |
| |
| // make FetchImpl per PullServer, PartId |
| Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions); |
| for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) { |
| Map<Task.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue()); |
| for (Entry<Task.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) { |
| |
| FetchImpl fetch = new FetchImpl(scan.getTableName(), e.getKey(), channel.getShuffleType(), |
| block.getId(), interm.getKey(), e.getValue()); |
| |
| long volumeSum = 0; |
| for (IntermediateEntry ie : e.getValue()) { |
| volumeSum += ie.getVolume(); |
| } |
| |
| if (finalFetches.containsKey(interm.getKey())) { |
| finalFetches.get(interm.getKey()).addFetche(fetch).increaseVolume(volumeSum); |
| } else { |
| finalFetches.put(interm.getKey(), new FetchGroupMeta(volumeSum, fetch)); |
| } |
| } |
| } |
| } |
| |
| int groupingColumns = 0; |
| LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(stage.getBlock().getPlan(), |
| new NodeType[]{NodeType.GROUP_BY, NodeType.DISTINCT_GROUP_BY}); |
| if (groupbyNodes != null && groupbyNodes.length > 0) { |
| LogicalNode bottomNode = groupbyNodes[0]; |
| if (bottomNode.getType() == NodeType.GROUP_BY) { |
| groupingColumns = ((GroupbyNode)bottomNode).getGroupingColumns().length; |
| } else if (bottomNode.getType() == NodeType.DISTINCT_GROUP_BY) { |
| DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); |
| if (distinctNode == null) { |
| LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode"); |
| distinctNode = (DistinctGroupbyNode)bottomNode; |
| } |
| groupingColumns = distinctNode.getGroupingColumns().length; |
| |
| Enforcer enforcer = execBlock.getEnforcer(); |
| EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode); |
| if (property != null) { |
| if (property.getDistinct().getIsMultipleAggregation()) { |
| MultipleAggregationStage mulAggStage = property.getDistinct().getMultipleAggregationStage(); |
| if (mulAggStage != MultipleAggregationStage.THRID_STAGE) { |
| groupingColumns = distinctNode.getOutSchema().size(); |
| } |
| } |
| } |
| } |
| } |
| // get a proper number of tasks |
| int determinedTaskNum = Math.min(maxNum, finalFetches.size()); |
| LOG.info(stage.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size()); |
| |
| if (groupingColumns == 0) { |
| determinedTaskNum = 1; |
| LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); |
| } else { |
| TableStats totalStat = computeChildBlocksStats(stage.getContext(), masterPlan, stage.getId()); |
| if (totalStat.getNumRows() == 0) { |
| determinedTaskNum = 1; |
| } |
| } |
| |
| // set the proper number of tasks to the estimated task num |
| if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { |
| scheduleScatteredHashShuffleFetches(schedulerContext, stage, intermediates, |
| scan.getTableName()); |
| } else { |
| schedulerContext.setEstimatedTaskNum(determinedTaskNum); |
| // divide fetch uris into the the proper number of tasks according to volumes |
| scheduleFetchesByEvenDistributedVolumes(stage, finalFetches, scan.getTableName(), determinedTaskNum); |
| LOG.info(stage.getId() + ", DeterminedTaskNum : " + determinedTaskNum); |
| } |
| } |
| |
| public static Pair<Long [], Map<String, List<FetchProto>>[]> makeEvenDistributedFetchImpl( |
| Map<Integer, FetchGroupMeta> partitions, String tableName, int num) { |
| |
| // Sort fetchGroupMeta in a descending order of data volumes. |
| List<FetchGroupMeta> fetchGroupMetaList = Lists.newArrayList(partitions.values()); |
| Collections.sort(fetchGroupMetaList, (o1, o2) -> |
| o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0)); |
| |
| // Initialize containers |
| Map<String, List<FetchProto>>[] fetchesArray = new Map[num]; |
| Long [] assignedVolumes = new Long[num]; |
| // initialization |
| for (int i = 0; i < num; i++) { |
| fetchesArray[i] = new HashMap<>(); |
| assignedVolumes[i] = 0l; |
| } |
| |
| // This algorithm assignes bigger first manner by using a sorted iterator. It is a kind of greedy manner. |
| // Its complexity is O(n). Since FetchGroup can be more than tens of thousands, we should consider its complexity. |
| // In terms of this point, it will show reasonable performance and results. even though it is not an optimal |
| // algorithm. |
| Iterator<FetchGroupMeta> iterator = fetchGroupMetaList.iterator(); |
| |
| int p; |
| while(iterator.hasNext()) { |
| p = 0; |
| while (p < num && iterator.hasNext()) { |
| FetchGroupMeta fetchGroupMeta = iterator.next(); |
| assignedVolumes[p] += fetchGroupMeta.getVolume(); |
| |
| TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.getFetchProtos()); |
| p++; |
| } |
| |
| p = num - 1; |
| while (p >= 0 && iterator.hasNext()) { |
| FetchGroupMeta fetchGroupMeta = iterator.next(); |
| assignedVolumes[p] += fetchGroupMeta.getVolume(); |
| TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.getFetchProtos()); |
| |
| // While the current one is smaller than next one, it adds additional fetches to current one. |
| while(iterator.hasNext() && (p > 0 && assignedVolumes[p - 1] > assignedVolumes[p])) { |
| FetchGroupMeta additionalFetchGroup = iterator.next(); |
| assignedVolumes[p] += additionalFetchGroup.getVolume(); |
| TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.getFetchProtos()); |
| } |
| |
| p--; |
| } |
| } |
| |
| return new Pair<>(assignedVolumes, fetchesArray); |
| } |
| |
| public static void scheduleFetchesByEvenDistributedVolumes(Stage stage, Map<Integer, FetchGroupMeta> partitions, |
| String tableName, int num) { |
| Map<String, List<FetchProto>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond(); |
| // Schedule FetchImpls |
| for (Map<String, List<FetchProto>> eachFetches : fetchsArray) { |
| Stage.scheduleFetches(stage, eachFetches); |
| } |
| } |
| |
| // Scattered hash shuffle hashes the key columns and groups the hash keys associated with |
| // the same hash key. Then, if the volume of a group is larger |
| // than $DIST_QUERY_TABLE_PARTITION_VOLUME, it divides the group into more than two sub groups |
| // according to $DIST_QUERY_TABLE_PARTITION_VOLUME (default size = 256MB). |
| // As a result, each group size always becomes the less than or equal |
| // to $DIST_QUERY_TABLE_PARTITION_VOLUME. Finally, each subgroup is assigned to a query unit. |
| // It is usually used for writing partitioned tables. |
| public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext, |
| Stage stage, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates, |
| String tableName) { |
| long splitVolume = (long)StorageUnit.MB * |
| stage.getMasterPlan().getContext().getInt(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE); |
| long pageSize = ((long)StorageUnit.MB) * |
| stage.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes |
| if (pageSize >= splitVolume) { |
| throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " + |
| "tajo.shuffle.hash.appender.page.volumn-mb"); |
| } |
| List<List<FetchProto>> fetches = new ArrayList<>(); |
| |
| long totalIntermediateSize = 0L; |
| for (Entry<ExecutionBlockId, List<IntermediateEntry>> listEntry : intermediates.entrySet()) { |
| // merge by PartitionId |
| Map<Integer, List<IntermediateEntry>> partitionIntermMap = new HashMap<>(); |
| for (IntermediateEntry eachInterm: listEntry.getValue()) { |
| totalIntermediateSize += eachInterm.getVolume(); |
| int partId = eachInterm.getPartId(); |
| List<IntermediateEntry> partitionInterms = partitionIntermMap.get(partId); |
| if (partitionInterms == null) { |
| partitionInterms = Lists.newArrayList(eachInterm); |
| partitionIntermMap.put(partId, partitionInterms); |
| } else { |
| partitionInterms.add(eachInterm); |
| } |
| } |
| |
| // Grouping or splitting to fit $DIST_QUERY_TABLE_PARTITION_VOLUME size |
| for (List<IntermediateEntry> partitionEntries : partitionIntermMap.values()) { |
| List<List<FetchProto>> eachFetches = splitOrMergeIntermediates(tableName, listEntry.getKey(), partitionEntries, |
| splitVolume, pageSize); |
| if (eachFetches != null && !eachFetches.isEmpty()) { |
| fetches.addAll(eachFetches); |
| } |
| } |
| } |
| |
| schedulerContext.setEstimatedTaskNum(fetches.size()); |
| |
| int i = 0; |
| Map<String, List<FetchProto>>[] fetchesArray = new Map[fetches.size()]; |
| for(List<FetchProto> entry : fetches) { |
| fetchesArray[i] = new HashMap<>(); |
| fetchesArray[i].put(tableName, entry); |
| |
| Stage.scheduleFetches(stage, fetchesArray[i]); |
| i++; |
| } |
| |
| LOG.info(stage.getId() |
| + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name() |
| + ", Intermediate Size: " + totalIntermediateSize |
| + ", splitSize: " + splitVolume |
| + ", DeterminedTaskNum: " + fetches.size()); |
| } |
| |
| /** |
| * If a IntermediateEntry is large than splitVolume, List<FetchImpl> has single element. |
| * @param ebId |
| * @param entries |
| * @param splitVolume |
| * @return |
| */ |
| public static List<List<FetchProto>> splitOrMergeIntermediates(@NotNull String fetchName, |
| ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) { |
| // Each List<FetchImpl> has splitVolume size. |
| List<List<FetchProto>> fetches = new ArrayList<>(); |
| |
| Iterator<IntermediateEntry> iter = entries.iterator(); |
| if (!iter.hasNext()) { |
| return null; |
| } |
| List<FetchProto> fetchListForSingleTask = new ArrayList<>(); |
| long fetchListVolume = 0; |
| |
| while (iter.hasNext()) { |
| IntermediateEntry currentInterm = iter.next(); |
| |
| long firstSplitVolume = splitVolume - fetchListVolume; |
| if (firstSplitVolume < pageSize) { |
| firstSplitVolume = splitVolume; |
| } |
| |
| //Each Pair object in the splits variable is assigned to the next ExectionBlock's task. |
| //The first long value is a offset of the intermediate file and the second long value is length. |
| List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume, splitVolume); |
| if (splits == null || splits.isEmpty()) { |
| break; |
| } |
| |
| for (Pair<Long, Long> eachSplit: splits) { |
| if (fetchListVolume > 0 && fetchListVolume + eachSplit.getSecond() >= splitVolume) { |
| if (!fetchListForSingleTask.isEmpty()) { |
| fetches.add(fetchListForSingleTask); |
| } |
| fetchListForSingleTask = new ArrayList<>(); |
| fetchListVolume = 0; |
| } |
| FetchImpl fetch = new FetchImpl(fetchName, currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE, |
| ebId, currentInterm.getPartId(), Lists.newArrayList(currentInterm)); |
| fetch.setOffset(eachSplit.getFirst()); |
| fetch.setLength(eachSplit.getSecond()); |
| fetchListForSingleTask.add(fetch.getProto()); |
| fetchListVolume += eachSplit.getSecond(); |
| } |
| } |
| if (!fetchListForSingleTask.isEmpty()) { |
| fetches.add(fetchListForSingleTask); |
| } |
| return fetches; |
| } |
| |
| /** |
| * Get the pull server URIs. |
| */ |
| public static List<URI> createFullURIs(int maxUrlLength, FetchProto fetch) { |
| return createFetchURL(maxUrlLength, fetch, true); |
| } |
| |
| /** |
| * Get the pull server URIs without repeated parameters. |
| */ |
| public static List<URI> createSimpleURIs(int maxUrlLength, FetchProto fetch) { |
| return createFetchURL(maxUrlLength, fetch, false); |
| } |
| |
| private static String getRangeParam(FetchProto proto) { |
| StringBuilder sb = new StringBuilder(); |
| String firstKeyBase64 = new String(org.apache.commons.codec.binary.Base64.encodeBase64(proto.getRangeStart().toByteArray())); |
| String lastKeyBase64 = new String(org.apache.commons.codec.binary.Base64.encodeBase64(proto.getRangeEnd().toByteArray())); |
| |
| try { |
| sb.append("start=") |
| .append(URLEncoder.encode(firstKeyBase64, "utf-8")) |
| .append("&") |
| .append("end=") |
| .append(URLEncoder.encode(lastKeyBase64, "utf-8")); |
| } catch (UnsupportedEncodingException e) { |
| throw new RuntimeException(e); |
| } |
| |
| if (proto.getRangeLastInclusive()) { |
| sb.append("&final=true"); |
| } |
| |
| return sb.toString(); |
| } |
| |
| public static List<URI> createFetchURL(int maxUrlLength, FetchProto fetch, boolean includeParts) { |
| String scheme = "http://"; |
| |
| StringBuilder urlPrefix = new StringBuilder(scheme); |
| ExecutionBlockId ebId = new ExecutionBlockId(fetch.getExecutionBlockId()); |
| urlPrefix.append(fetch.getHost()).append(":").append(fetch.getPort()).append("/?") |
| .append("qid=").append(ebId.getQueryId().toString()) |
| .append("&sid=").append(ebId.getId()) |
| .append("&p=").append(fetch.getPartitionId()) |
| .append("&type="); |
| if (fetch.getType() == HASH_SHUFFLE) { |
| urlPrefix.append("h"); |
| } else if (fetch.getType() == RANGE_SHUFFLE) { |
| urlPrefix.append("r").append("&").append(getRangeParam(fetch)); |
| } else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) { |
| urlPrefix.append("s"); |
| } |
| |
| if (fetch.getLength() >= 0) { |
| urlPrefix.append("&offset=").append(fetch.getOffset()).append("&length=").append(fetch.getLength()); |
| } |
| |
| List<URI> fetchURLs = new ArrayList<>(); |
| if(includeParts) { |
| if (fetch.getType() == HASH_SHUFFLE || fetch.getType() == SCATTERED_HASH_SHUFFLE) { |
| fetchURLs.add(URI.create(urlPrefix.toString())); |
| } else { |
| urlPrefix.append("&ta="); |
| // If the get request is longer than 2000 characters, |
| // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long. |
| // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15 |
| // The below code transforms a long request to multiple requests. |
| List<String> taskIdsParams = new ArrayList<>(); |
| StringBuilder taskIdListBuilder = new StringBuilder(); |
| |
| final List<Integer> taskIds = fetch.getTaskIdList(); |
| final List<Integer> attemptIds = fetch.getAttemptIdList(); |
| |
| // Sort task ids to increase cache hit in pull server |
| final List<Pair<Integer, Integer>> taskAndAttemptIds = IntStream.range(0, taskIds.size()) |
| .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i))) |
| .sorted((p1, p2) -> p1.getFirst() - p2.getFirst()) |
| .collect(Collectors.toList()); |
| |
| boolean first = true; |
| |
| for (int i = 0; i < taskAndAttemptIds.size(); i++) { |
| StringBuilder taskAttemptId = new StringBuilder(); |
| |
| if (!first) { // when comma is added? |
| taskAttemptId.append(","); |
| } else { |
| first = false; |
| } |
| |
| int taskId = taskAndAttemptIds.get(i).getFirst(); |
| if (taskId < 0) { |
| // In the case of hash shuffle each partition has single shuffle file per worker. |
| // TODO If file is large, consider multiple fetching(shuffle file can be split) |
| continue; |
| } |
| int attemptId = taskAndAttemptIds.get(i).getSecond(); |
| taskAttemptId.append(taskId).append("_").append(attemptId); |
| |
| if (urlPrefix.length() + taskIdListBuilder.length() > maxUrlLength) { |
| taskIdsParams.add(taskIdListBuilder.toString()); |
| taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId); |
| } else { |
| taskIdListBuilder.append(taskAttemptId); |
| } |
| } |
| // if the url params remain |
| if (taskIdListBuilder.length() > 0) { |
| taskIdsParams.add(taskIdListBuilder.toString()); |
| } |
| for (String param : taskIdsParams) { |
| fetchURLs.add(URI.create(urlPrefix + param)); |
| } |
| } |
| } else { |
| fetchURLs.add(URI.create(urlPrefix.toString())); |
| } |
| |
| return fetchURLs; |
| } |
| |
| public static Map<Integer, List<IntermediateEntry>> hashByKey(List<IntermediateEntry> entries) { |
| Map<Integer, List<IntermediateEntry>> hashed = new HashMap<>(); |
| for (IntermediateEntry entry : entries) { |
| if (hashed.containsKey(entry.getPartId())) { |
| hashed.get(entry.getPartId()).add(entry); |
| } else { |
| hashed.put(entry.getPartId(), Lists.newArrayList(entry)); |
| } |
| } |
| |
| return hashed; |
| } |
| |
| public static Map<Task.PullHost, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) { |
| Map<Task.PullHost, List<IntermediateEntry>> hashed = new HashMap<>(); |
| |
| Task.PullHost host; |
| for (IntermediateEntry entry : entries) { |
| host = entry.getPullHost(); |
| if (hashed.containsKey(host)) { |
| hashed.get(host).add(entry); |
| } else { |
| hashed.put(host, Lists.newArrayList(entry)); |
| } |
| } |
| |
| return hashed; |
| } |
| |
| public static Stage setShuffleOutputNumForTwoPhase(Stage stage, final int desiredNum, DataChannel channel) { |
| ExecutionBlock execBlock = stage.getBlock(); |
| Column[] keys; |
| // if the next query is join, |
| // set the partition number for the current logicalUnit |
| // TODO: the union handling is required when a join has unions as its child |
| MasterPlan masterPlan = stage.getMasterPlan(); |
| keys = channel.getShuffleKeys(); |
| if (!masterPlan.isRoot(stage.getBlock()) ) { |
| ExecutionBlock parentBlock = masterPlan.getParent(stage.getBlock()); |
| if (parentBlock.getPlan().getType() == NodeType.JOIN) { |
| channel.setShuffleOutputNum(desiredNum); |
| } |
| } |
| |
| // set the partition number for group by and sort |
| if (channel.isHashShuffle()) { |
| if (execBlock.getPlan().getType() == NodeType.GROUP_BY || |
| execBlock.getPlan().getType() == NodeType.DISTINCT_GROUP_BY) { |
| keys = channel.getShuffleKeys(); |
| } |
| } else if (channel.isRangeShuffle()) { |
| if (execBlock.getPlan().getType() == NodeType.SORT) { |
| SortNode sort = (SortNode) execBlock.getPlan(); |
| keys = new Column[sort.getSortKeys().length]; |
| for (int i = 0; i < keys.length; i++) { |
| keys[i] = sort.getSortKeys()[i].getSortKey(); |
| } |
| } |
| } |
| if (keys != null) { |
| if (keys.length == 0) { |
| channel.setShuffleKeys(new Column[]{}); |
| channel.setShuffleOutputNum(1); |
| } else { |
| channel.setShuffleKeys(keys); |
| // NOTE: desiredNum is not used in Sort anymore. |
| channel.setShuffleOutputNum(desiredNum); |
| } |
| } |
| return stage; |
| } |
| } |