| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.iterate; |
| |
| import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS; |
| |
| import java.sql.SQLException; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Queue; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.phoenix.compile.QueryPlan; |
| import org.apache.phoenix.job.JobManager.JobCallable; |
| import org.apache.phoenix.monitoring.CombinableMetric; |
| import org.apache.phoenix.monitoring.MetricType; |
| import org.apache.phoenix.monitoring.ReadMetricQueue; |
| import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; |
| import org.apache.phoenix.trace.util.Tracing; |
| import org.apache.phoenix.util.LogUtil; |
| import org.apache.phoenix.util.ScanUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.Lists; |
| /** |
| * |
| * Class that parallelizes the scan over a table using the ExecutorService provided. Each region of the table will be scanned in parallel with |
| * the results accessible through {@link #getIterators()} |
| * |
| * |
| * @since 0.1 |
| */ |
| public class ParallelIterators extends BaseResultIterators { |
| private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class); |
| private static final String NAME = "PARALLEL"; |
| private final ParallelIteratorFactory iteratorFactory; |
| private final boolean initFirstScanOnly; |
| |
| public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly) |
| throws SQLException { |
| super(plan, perScanLimit, null, scanGrouper, scan); |
| this.iteratorFactory = iteratorFactory; |
| this.initFirstScanOnly = initFirstScanOnly; |
| } |
| |
| public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion) |
| throws SQLException { |
| this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion); |
| } |
| |
| @Override |
| protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, |
| final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException { |
| // Pre-populate nestedFutures lists so that we can shuffle the scans |
| // and add the future to the right nested list. By shuffling the scans |
| // we get better utilization of the cluster since our thread executor |
| // will spray the scans across machines as opposed to targeting a |
| // single one since the scans are in row key order. |
| ExecutorService executor = context.getConnection().getQueryServices().getExecutor(); |
| List<ScanLocator> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize); |
| for (int i = 0; i < nestedScans.size(); i++) { |
| List<Scan> scans = nestedScans.get(i); |
| int numScans = scans.size(); |
| List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(numScans); |
| nestedFutures.add(futures); |
| for (int j = 0; j < numScans; j++) { |
| Scan scan = nestedScans.get(i).get(j); |
| scanLocations.add(new ScanLocator(scan, i, j, j == 0, (j == numScans - 1))); |
| futures.add(null); // placeholder |
| } |
| } |
| // Shuffle so that we start execution across many machines |
| // before we fill up the thread pool |
| Collections.shuffle(scanLocations); |
| ReadMetricQueue readMetrics = context.getReadMetricsQueue(); |
| final String physicalTableName = tableRef.getTable().getPhysicalName().getString(); |
| int numScans = scanLocations.size(); |
| context.getOverallQueryMetrics().updateNumParallelScans(numScans); |
| GLOBAL_NUM_PARALLEL_SCANS.update(numScans); |
| final long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); |
| for (final ScanLocator scanLocation : scanLocations) { |
| final Scan scan = scanLocation.getScan(); |
| final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName); |
| final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName); |
| final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper); |
| context.getConnection().addIteratorForLeaseRenewal(tableResultItr); |
| Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { |
| |
| @Override |
| public PeekingResultIterator call() throws Exception { |
| long startTime = System.currentTimeMillis(); |
| if (logger.isDebugEnabled()) { |
| logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan))); |
| } |
| PeekingResultIterator iterator = iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName, ParallelIterators.this.plan); |
| if (initFirstScanOnly) { |
| if ((!isReverse && scanLocation.isFirstScan()) || (isReverse && scanLocation.isLastScan())) { |
| // Fill the scanner's cache. This helps reduce latency since we are parallelizing the I/O needed. |
| iterator.peek(); |
| } |
| } else { |
| iterator.peek(); |
| } |
| allIterators.add(iterator); |
| return iterator; |
| } |
| |
| /** |
| * Defines the grouping for round robin behavior. All threads spawned to process |
| * this scan will be grouped together and time sliced with other simultaneously |
| * executing parallel scans. |
| */ |
| @Override |
| public Object getJobId() { |
| return ParallelIterators.this; |
| } |
| |
| @Override |
| public TaskExecutionMetricsHolder getTaskExecutionMetric() { |
| return taskMetrics; |
| } |
| }, "Parallel scanner for table: " + tableRef.getTable().getPhysicalName().getString())); |
| // Add our future in the right place so that we can concatenate the |
| // results of the inner futures versus merge sorting across all of them. |
| nestedFutures.get(scanLocation.getOuterListIndex()).set(scanLocation.getInnerListIndex(), new Pair<Scan,Future<PeekingResultIterator>>(scan,future)); |
| } |
| } |
| |
| @Override |
| protected String getName() { |
| return NAME; |
| } |
| } |