| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.accumulo.core.client.mapreduce; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| |
| import org.apache.accumulo.core.client.IteratorSetting; |
| import org.apache.accumulo.core.client.ScannerBase; |
| import org.apache.accumulo.core.client.sample.SamplerConfiguration; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; |
| import org.apache.accumulo.core.util.Pair; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.Writable; |
| |
| /** |
| * This class to holds a batch scan configuration for a table. It contains all the properties needed |
| * to specify how rows should be returned from the table. |
| */ |
| public class InputTableConfig implements Writable { |
| |
| private List<IteratorSetting> iterators; |
| private List<Range> ranges; |
| private Collection<Pair<Text,Text>> columns; |
| |
| private boolean autoAdjustRanges = true; |
| private boolean useLocalIterators = false; |
| private boolean useIsolatedScanners = false; |
| private boolean offlineScan = false; |
| private SamplerConfiguration samplerConfig = null; |
| private Map<String,String> executionHints = Collections.emptyMap(); |
| |
| public InputTableConfig() {} |
| |
| /** |
| * Creates a batch scan config object out of a previously serialized batch scan config object. |
| * |
| * @param input |
| * the data input of the serialized batch scan config |
| */ |
| public InputTableConfig(DataInput input) throws IOException { |
| readFields(input); |
| } |
| |
| /** |
| * Sets the input ranges to scan for all tables associated with this job. This will be added to |
| * any per-table ranges that have been set using |
| * |
| * @param ranges |
| * the ranges that will be mapped over |
| * @since 1.6.0 |
| */ |
| public InputTableConfig setRanges(List<Range> ranges) { |
| this.ranges = ranges; |
| return this; |
| } |
| |
| /** |
| * Returns the ranges to be queried in the configuration |
| */ |
| public List<Range> getRanges() { |
| return ranges != null ? ranges : new ArrayList<>(); |
| } |
| |
| /** |
| * Restricts the columns that will be mapped over for this job for the default input table. |
| * |
| * @param columns |
| * a pair of {@link Text} objects corresponding to column family and column qualifier. If |
| * the column qualifier is null, the entire column family is selected. An empty set is |
| * the default and is equivalent to scanning the all columns. |
| * @since 1.6.0 |
| */ |
| public InputTableConfig fetchColumns(Collection<Pair<Text,Text>> columns) { |
| this.columns = columns; |
| return this; |
| } |
| |
| /** |
| * Returns the columns to be fetched for this configuration |
| */ |
| public Collection<Pair<Text,Text>> getFetchedColumns() { |
| return columns != null ? columns : new HashSet<>(); |
| } |
| |
| /** |
| * Set iterators on to be used in the query. |
| * |
| * @param iterators |
| * the configurations for the iterators |
| * @since 1.6.0 |
| */ |
| public InputTableConfig setIterators(List<IteratorSetting> iterators) { |
| this.iterators = iterators; |
| return this; |
| } |
| |
| /** |
| * Returns the iterators to be set on this configuration |
| */ |
| public List<IteratorSetting> getIterators() { |
| return iterators != null ? iterators : new ArrayList<>(); |
| } |
| |
| /** |
| * Controls the automatic adjustment of ranges for this job. This feature merges overlapping |
| * ranges, then splits them to align with tablet boundaries. Disabling this feature will cause |
| * exactly one Map task to be created for each specified range. The default setting is enabled. * |
| * |
| * <p> |
| * By default, this feature is <b>enabled</b>. |
| * |
| * @param autoAdjustRanges |
| * the feature is enabled if true, disabled otherwise |
| * @see #setRanges(java.util.List) |
| * @since 1.6.0 |
| */ |
| public InputTableConfig setAutoAdjustRanges(boolean autoAdjustRanges) { |
| this.autoAdjustRanges = autoAdjustRanges; |
| return this; |
| } |
| |
| /** |
| * Determines whether a configuration has auto-adjust ranges enabled. |
| * |
| * @return false if the feature is disabled, true otherwise |
| * @since 1.6.0 |
| * @see #setAutoAdjustRanges(boolean) |
| */ |
| public boolean shouldAutoAdjustRanges() { |
| return autoAdjustRanges; |
| } |
| |
| /** |
| * Controls the use of the {@link org.apache.accumulo.core.client.ClientSideIteratorScanner} in |
| * this job. Enabling this feature will cause the iterator stack to be constructed within the Map |
| * task, rather than within the Accumulo TServer. To use this feature, all classes needed for |
| * those iterators must be available on the classpath for the task. |
| * |
| * <p> |
| * By default, this feature is <b>disabled</b>. |
| * |
| * @param useLocalIterators |
| * the feature is enabled if true, disabled otherwise |
| * @since 1.6.0 |
| */ |
| public InputTableConfig setUseLocalIterators(boolean useLocalIterators) { |
| this.useLocalIterators = useLocalIterators; |
| return this; |
| } |
| |
| /** |
| * Determines whether a configuration uses local iterators. |
| * |
| * @return true if the feature is enabled, false otherwise |
| * @since 1.6.0 |
| * @see #setUseLocalIterators(boolean) |
| */ |
| public boolean shouldUseLocalIterators() { |
| return useLocalIterators; |
| } |
| |
| /** |
| * Enable reading offline tables. By default, this feature is disabled and only online tables are |
| * scanned. This will make the map reduce job directly read the table's files. If the table is not |
| * offline, then the job will fail. If the table comes online during the map reduce job, it is |
| * likely that the job will fail. |
| * |
| * <p> |
| * To use this option, the map reduce user will need access to read the Accumulo directory in |
| * HDFS. |
| * |
| * <p> |
| * Reading the offline table will create the scan time iterator stack in the map process. So any |
| * iterators that are configured for the table will need to be on the mapper's classpath. The |
| * accumulo.properties may need to be on the mapper's classpath if HDFS or the Accumulo directory |
| * in HDFS are non-standard. |
| * |
| * <p> |
| * One way to use this feature is to clone a table, take the clone offline, and use the clone as |
| * the input table for a map reduce job. If you plan to map reduce over the data many times, it |
| * may be better to the compact the table, clone it, take it offline, and use the clone for all |
| * map reduce jobs. The reason to do this is that compaction will reduce each tablet in the table |
| * to one file, and it is faster to read from one file. |
| * |
| * <p> |
| * There are two possible advantages to reading a tables file directly out of HDFS. First, you may |
| * see better read performance. Second, it will support speculative execution better. When reading |
| * an online table speculative execution can put more load on an already slow tablet server. |
| * |
| * <p> |
| * By default, this feature is <b>disabled</b>. |
| * |
| * @param offlineScan |
| * the feature is enabled if true, disabled otherwise |
| * @since 1.6.0 |
| */ |
| public InputTableConfig setOfflineScan(boolean offlineScan) { |
| this.offlineScan = offlineScan; |
| return this; |
| } |
| |
| /** |
| * Determines whether a configuration has the offline table scan feature enabled. |
| * |
| * @return true if the feature is enabled, false otherwise |
| * @since 1.6.0 |
| * @see #setOfflineScan(boolean) |
| */ |
| public boolean isOfflineScan() { |
| return offlineScan; |
| } |
| |
| /** |
| * Controls the use of the {@link org.apache.accumulo.core.client.IsolatedScanner} in this job. |
| * |
| * <p> |
| * By default, this feature is <b>disabled</b>. |
| * |
| * @param useIsolatedScanners |
| * the feature is enabled if true, disabled otherwise |
| * @since 1.6.0 |
| */ |
| public InputTableConfig setUseIsolatedScanners(boolean useIsolatedScanners) { |
| this.useIsolatedScanners = useIsolatedScanners; |
| return this; |
| } |
| |
| /** |
| * Determines whether a configuration has isolation enabled. |
| * |
| * @return true if the feature is enabled, false otherwise |
| * @since 1.6.0 |
| * @see #setUseIsolatedScanners(boolean) |
| */ |
| public boolean shouldUseIsolatedScanners() { |
| return useIsolatedScanners; |
| } |
| |
| /** |
| * Set the sampler configuration to use when reading from the data. |
| * |
| * @see ScannerBase#setSamplerConfiguration(SamplerConfiguration) |
| * @see InputFormatBase#setSamplerConfiguration(org.apache.hadoop.mapreduce.Job, |
| * SamplerConfiguration) |
| * |
| * @since 1.8.0 |
| */ |
| public void setSamplerConfiguration(SamplerConfiguration samplerConfiguration) { |
| this.samplerConfig = samplerConfiguration; |
| } |
| |
| /** |
| * |
| * @since 1.8.0 |
| */ |
| public SamplerConfiguration getSamplerConfiguration() { |
| return samplerConfig; |
| } |
| |
| /** |
| * The execution hints to set on created scanners. See {@link ScannerBase#setExecutionHints(Map)} |
| * |
| * @since 2.0.0 |
| */ |
| public void setExecutionHints(Map<String,String> executionHints) { |
| this.executionHints = executionHints; |
| } |
| |
| /** |
| * @since 2.0.0 |
| */ |
| public Map<String,String> getExecutionHints() { |
| return executionHints; |
| } |
| |
| @Override |
| public void write(DataOutput dataOutput) throws IOException { |
| if (iterators != null) { |
| dataOutput.writeInt(iterators.size()); |
| for (IteratorSetting setting : iterators) |
| setting.write(dataOutput); |
| } else { |
| dataOutput.writeInt(0); |
| } |
| if (ranges != null) { |
| dataOutput.writeInt(ranges.size()); |
| for (Range range : ranges) |
| range.write(dataOutput); |
| } else { |
| dataOutput.writeInt(0); |
| } |
| if (columns != null) { |
| dataOutput.writeInt(columns.size()); |
| for (Pair<Text,Text> column : columns) { |
| if (column.getSecond() == null) { |
| dataOutput.writeInt(1); |
| column.getFirst().write(dataOutput); |
| } else { |
| dataOutput.writeInt(2); |
| column.getFirst().write(dataOutput); |
| column.getSecond().write(dataOutput); |
| } |
| } |
| } else { |
| dataOutput.writeInt(0); |
| } |
| dataOutput.writeBoolean(autoAdjustRanges); |
| dataOutput.writeBoolean(useLocalIterators); |
| dataOutput.writeBoolean(useIsolatedScanners); |
| dataOutput.writeBoolean(offlineScan); |
| if (samplerConfig == null) { |
| dataOutput.writeBoolean(false); |
| } else { |
| dataOutput.writeBoolean(true); |
| new SamplerConfigurationImpl(samplerConfig).write(dataOutput); |
| } |
| |
| if (executionHints == null || executionHints.size() == 0) { |
| dataOutput.writeInt(0); |
| } else { |
| dataOutput.writeInt(executionHints.size()); |
| for (Entry<String,String> entry : executionHints.entrySet()) { |
| dataOutput.writeUTF(entry.getKey()); |
| dataOutput.writeUTF(entry.getValue()); |
| } |
| } |
| } |
| |
| @Override |
| public void readFields(DataInput dataInput) throws IOException { |
| // load iterators |
| long iterSize = dataInput.readInt(); |
| if (iterSize > 0) |
| iterators = new ArrayList<>(); |
| for (int i = 0; i < iterSize; i++) |
| iterators.add(new IteratorSetting(dataInput)); |
| // load ranges |
| long rangeSize = dataInput.readInt(); |
| if (rangeSize > 0) |
| ranges = new ArrayList<>(); |
| for (int i = 0; i < rangeSize; i++) { |
| Range range = new Range(); |
| range.readFields(dataInput); |
| ranges.add(range); |
| } |
| // load columns |
| long columnSize = dataInput.readInt(); |
| if (columnSize > 0) |
| columns = new HashSet<>(); |
| for (int i = 0; i < columnSize; i++) { |
| long numPairs = dataInput.readInt(); |
| Text colFam = new Text(); |
| colFam.readFields(dataInput); |
| if (numPairs == 1) { |
| columns.add(new Pair<>(colFam, null)); |
| } else if (numPairs == 2) { |
| Text colQual = new Text(); |
| colQual.readFields(dataInput); |
| columns.add(new Pair<>(colFam, colQual)); |
| } |
| } |
| autoAdjustRanges = dataInput.readBoolean(); |
| useLocalIterators = dataInput.readBoolean(); |
| useIsolatedScanners = dataInput.readBoolean(); |
| offlineScan = dataInput.readBoolean(); |
| |
| if (dataInput.readBoolean()) { |
| samplerConfig = new SamplerConfigurationImpl(dataInput).toSamplerConfiguration(); |
| } |
| |
| executionHints = new HashMap<>(); |
| int numHints = dataInput.readInt(); |
| for (int i = 0; i < numHints; i++) { |
| String k = dataInput.readUTF(); |
| String v = dataInput.readUTF(); |
| executionHints.put(k, v); |
| } |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| if (o == null || getClass() != o.getClass()) |
| return false; |
| |
| InputTableConfig that = (InputTableConfig) o; |
| |
| if (autoAdjustRanges != that.autoAdjustRanges) |
| return false; |
| if (offlineScan != that.offlineScan) |
| return false; |
| if (useIsolatedScanners != that.useIsolatedScanners) |
| return false; |
| if (useLocalIterators != that.useLocalIterators) |
| return false; |
| if (columns != null ? !columns.equals(that.columns) : that.columns != null) |
| return false; |
| if (iterators != null ? !iterators.equals(that.iterators) : that.iterators != null) |
| return false; |
| if (ranges != null ? !ranges.equals(that.ranges) : that.ranges != null) |
| return false; |
| if (executionHints != null ? !executionHints.equals(that.executionHints) |
| : that.executionHints != null) |
| return false; |
| return samplerConfig != null ? samplerConfig.equals(that.samplerConfig) |
| : that.samplerConfig == null; |
| } |
| |
| @Override |
| public int hashCode() { |
| int result = 31 * (iterators != null ? iterators.hashCode() : 0); |
| result = 31 * result + (ranges != null ? ranges.hashCode() : 0); |
| result = 31 * result + (columns != null ? columns.hashCode() : 0); |
| result = 31 * result + (autoAdjustRanges ? 1 : 0); |
| result = 31 * result + (useLocalIterators ? 1 : 0); |
| result = 31 * result + (useIsolatedScanners ? 1 : 0); |
| result = 31 * result + (offlineScan ? 1 : 0); |
| result = 31 * result + (samplerConfig == null ? 0 : samplerConfig.hashCode()); |
| result = 31 * result + (executionHints == null ? 0 : executionHints.hashCode()); |
| return result; |
| } |
| } |