| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.accumulo.core.client.mapred; |
| |
| import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; |
| |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.security.SecureRandom; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.accumulo.core.client.Accumulo; |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.AccumuloSecurityException; |
| import org.apache.accumulo.core.client.BatchScanner; |
| import org.apache.accumulo.core.client.ClientInfo; |
| import org.apache.accumulo.core.client.ClientSideIteratorScanner; |
| import org.apache.accumulo.core.client.IsolatedScanner; |
| import org.apache.accumulo.core.client.IteratorSetting; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.ScannerBase; |
| import org.apache.accumulo.core.client.TableDeletedException; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.client.TableOfflineException; |
| import org.apache.accumulo.core.client.admin.DelegationTokenConfig; |
| import org.apache.accumulo.core.client.admin.SecurityOperations; |
| import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; |
| import org.apache.accumulo.core.client.impl.ClientContext; |
| import org.apache.accumulo.core.client.impl.DelegationTokenImpl; |
| import org.apache.accumulo.core.client.impl.OfflineScanner; |
| import org.apache.accumulo.core.client.impl.ScannerImpl; |
| import org.apache.accumulo.core.client.impl.Table; |
| import org.apache.accumulo.core.client.impl.Tables; |
| import org.apache.accumulo.core.client.impl.TabletLocator; |
| import org.apache.accumulo.core.client.mapred.impl.BatchInputSplit; |
| import org.apache.accumulo.core.client.mapreduce.InputTableConfig; |
| import org.apache.accumulo.core.client.mapreduce.impl.SplitUtils; |
| import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; |
| import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; |
| import org.apache.accumulo.core.client.sample.SamplerConfiguration; |
| import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; |
| import org.apache.accumulo.core.client.security.tokens.DelegationToken; |
| import org.apache.accumulo.core.client.security.tokens.KerberosToken; |
| import org.apache.accumulo.core.client.security.tokens.PasswordToken; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.data.impl.KeyExtent; |
| import org.apache.accumulo.core.master.state.tables.TableState; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.core.util.Pair; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapred.InputFormat; |
| import org.apache.hadoop.mapred.InputSplit; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.RecordReader; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.log4j.Level; |
| import org.apache.log4j.Logger; |
| |
| /** |
| * An abstract input format to provide shared methods common to all other input format classes. At |
| * the very least, any classes inheriting from this class will need to define their own |
| * {@link RecordReader}. |
| */ |
| public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { |
| protected static final Class<?> CLASS = AccumuloInputFormat.class; |
| protected static final Logger log = Logger.getLogger(CLASS); |
| |
| /** |
| * Sets the name of the classloader context on this scanner |
| * |
| * @param job |
| * the Hadoop job instance to be configured |
| * @param context |
| * name of the classloader context |
| * @since 1.8.0 |
| */ |
| public static void setClassLoaderContext(JobConf job, String context) { |
| InputConfigurator.setClassLoaderContext(CLASS, job, context); |
| } |
| |
| /** |
| * Returns the name of the current classloader context set on this scanner |
| * |
| * @param job |
| * the Hadoop job instance to be configured |
| * @return name of the current context |
| * @since 1.8.0 |
| */ |
| public static String getClassLoaderContext(JobConf job) { |
| return InputConfigurator.getClassLoaderContext(CLASS, job); |
| } |
| |
| /** |
| * Sets connection information needed to communicate with Accumulo for this job |
| * |
| * @param job |
| * Hadoop job instance to be configured |
| * @param info |
| * Connection information for Accumulo |
| * @since 2.0.0 |
| */ |
| public static void setClientInfo(JobConf job, ClientInfo info) { |
| ClientInfo inputInfo = InputConfigurator.updateToken(job.getCredentials(), info); |
| InputConfigurator.setClientInfo(CLASS, job, inputInfo); |
| } |
| |
| /** |
| * Set Accumulo client properties file used to connect to Accumulo |
| * |
| * @param job |
| * Hadoop job to be configured |
| * @param clientPropsFile |
| * URL to Accumulo client properties file |
| * @since 2.0.0 |
| */ |
| public static void setClientPropertiesFile(JobConf job, String clientPropsFile) { |
| InputConfigurator.setClientPropertiesFile(CLASS, job, clientPropsFile); |
| } |
| |
| /** |
| * Retrieves {@link ClientInfo} from the configuration |
| * |
| * @param job |
| * Hadoop job instance configuration |
| * @return {@link ClientInfo} object |
| * @since 2.0.0 |
| */ |
| protected static ClientInfo getClientInfo(JobConf job) { |
| return InputConfigurator.getClientInfo(CLASS, job); |
| } |
| |
| /** |
| * Sets the connector information needed to communicate with Accumulo in this job. |
| * |
| * <p> |
| * <b>WARNING:</b> Some tokens, when serialized, divulge sensitive information in the |
| * configuration as a means to pass the token to MapReduce tasks. This information is BASE64 |
| * encoded to provide a charset safe conversion to a string, but this conversion is not intended |
| * to be secure. {@link PasswordToken} is one example that is insecure in this way; however |
| * {@link DelegationToken}s, acquired using |
| * {@link SecurityOperations#getDelegationToken(DelegationTokenConfig)}, is not subject to this |
| * concern. |
| * |
| * @param job |
| * the Hadoop job instance to be configured |
| * @param principal |
| * a valid Accumulo user name (user must have Table.CREATE permission) |
| * @param token |
| * the user's password |
| * @since 1.5.0 |
| * @deprecated since 2.0.0, use {@link #setClientInfo(JobConf, ClientInfo)} instead |
| */ |
| @Deprecated |
| public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) |
| throws AccumuloSecurityException { |
| if (token instanceof KerberosToken) { |
| log.info("Received KerberosToken, attempting to fetch DelegationToken"); |
| try { |
| AccumuloClient client = Accumulo.newClient().usingClientInfo(getClientInfo(job)) |
| .usingToken(principal, token).build(); |
| token = client.securityOperations().getDelegationToken(new DelegationTokenConfig()); |
| } catch (Exception e) { |
| log.warn("Failed to automatically obtain DelegationToken, Mappers/Reducers will likely" |
| + " fail to communicate with Accumulo", e); |
| } |
| } |
| // DelegationTokens can be passed securely from user to task without serializing insecurely in |
| // the configuration |
| if (token instanceof DelegationTokenImpl) { |
| DelegationTokenImpl delegationToken = (DelegationTokenImpl) token; |
| |
| // Convert it into a Hadoop Token |
| AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier(); |
| Token<AuthenticationTokenIdentifier> hadoopToken = new Token<>(identifier.getBytes(), |
| delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName()); |
| |
| // Add the Hadoop Token to the Job so it gets serialized and passed along. |
| job.getCredentials().addToken(hadoopToken.getService(), hadoopToken); |
| } |
| |
| InputConfigurator.setConnectorInfo(CLASS, job, principal, token); |
| } |
| |
| /** |
| * Sets the connector information needed to communicate with Accumulo in this job. |
| * |
| * <p> |
| * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt |
| * to be more secure than storing it in the Configuration. |
| * |
| * @param job |
| * the Hadoop job instance to be configured |
| * @param principal |
| * a valid Accumulo user name (user must have Table.CREATE permission) |
| * @param tokenFile |
| * the path to the token file |
| * @since 1.6.0 |
| * @deprecated since 2.0.0, use {@link #setClientPropertiesFile(JobConf, String)} instead |
| */ |
| @Deprecated |
| public static void setConnectorInfo(JobConf job, String principal, String tokenFile) |
| throws AccumuloSecurityException { |
| setClientPropertiesFile(job, tokenFile); |
| } |
| |
| /** |
| * Determines if the connector has been configured. |
| * |
| * @param job |
| * the Hadoop context for the configured job |
| * @return true if the connector has been configured, false otherwise |
| * @since 1.5.0 |
| * @see #setConnectorInfo(JobConf, String, AuthenticationToken) |
| */ |
| protected static Boolean isConnectorInfoSet(JobConf job) { |
| return InputConfigurator.isConnectorInfoSet(CLASS, job); |
| } |
| |
| /** |
| * Gets the user name from the configuration. |
| * |
| * @param job |
| * the Hadoop context for the configured job |
| * @return the user name |
| * @since 1.5.0 |
| * @see #setConnectorInfo(JobConf, String, AuthenticationToken) |
| */ |
| protected static String getPrincipal(JobConf job) { |
| return InputConfigurator.getPrincipal(CLASS, job); |
| } |
| |
| /** |
| * Gets the authenticated token from either the specified token file or directly from the |
| * configuration, whichever was used when the job was configured. |
| * |
| * @param job |
| * the Hadoop context for the configured job |
| * @return the principal's authentication token |
| * @since 1.6.0 |
| * @see #setConnectorInfo(JobConf, String, AuthenticationToken) |
| * @see #setConnectorInfo(JobConf, String, String) |
| */ |
| protected static AuthenticationToken getAuthenticationToken(JobConf job) { |
| AuthenticationToken token = InputConfigurator.getAuthenticationToken(CLASS, job); |
| return ConfiguratorBase.unwrapAuthenticationToken(job, token); |
| } |
| |
| /** |
| * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. |
| * |
| * @param job |
| * the Hadoop job instance to be configured |
| * @param clientConfig |
| * client configuration containing connection options |
| * @since 1.6.0 |
| * @deprecated since 2.0.0; Use {@link #setClientInfo(JobConf, ClientInfo)} instead. |
| */ |
| @Deprecated |
| public static void setZooKeeperInstance(JobConf job, |
| org.apache.accumulo.core.client.ClientConfiguration clientConfig) { |
| InputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig); |
| } |
| |
| /** |
| * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the |
| * configuration. |
| * |
| * @param job |
| * the Hadoop context for the configured job |
| * @return an Accumulo instance |
| * @since 1.5.0 |
| * @deprecated since 2.0.0, Use {@link #getClientInfo(JobConf)} instead |
| */ |
| @Deprecated |
| protected static org.apache.accumulo.core.client.Instance getInstance(JobConf job) { |
| return InputConfigurator.getInstance(CLASS, job); |
| } |
| |
| /** |
| * Sets the log level for this job. |
| * |
| * @param job |
| * the Hadoop job instance to be configured |
| * @param level |
| * the logging level |
| * @since 1.5.0 |
| */ |
| public static void setLogLevel(JobConf job, Level level) { |
| InputConfigurator.setLogLevel(CLASS, job, level); |
| } |
| |
| /** |
| * Gets the log level from this configuration. |
| * |
| * @param job |
| * the Hadoop context for the configured job |
| * @return the log level |
| * @since 1.5.0 |
| * @see #setLogLevel(JobConf, Level) |
| */ |
| protected static Level getLogLevel(JobConf job) { |
| return InputConfigurator.getLogLevel(CLASS, job); |
| } |
| |
| /** |
| * Sets the {@link org.apache.accumulo.core.security.Authorizations} used to scan. Must be a |
| * subset of the user's authorization. Defaults to the empty set. |
| * |
| * @param job |
| * the Hadoop job instance to be configured |
| * @param auths |
| * the user's authorizations |
| * @since 1.5.0 |
| */ |
| public static void setScanAuthorizations(JobConf job, Authorizations auths) { |
| InputConfigurator.setScanAuthorizations(CLASS, job, auths); |
| } |
| |
| /** |
| * Gets the authorizations to set for the scans from the configuration. |
| * |
| * @param job |
| * the Hadoop context for the configured job |
| * @return the Accumulo scan authorizations |
| * @since 1.5.0 |
| * @see #setScanAuthorizations(JobConf, Authorizations) |
| */ |
| protected static Authorizations getScanAuthorizations(JobConf job) { |
| return InputConfigurator.getScanAuthorizations(CLASS, job); |
| } |
| |
| /** |
| * Fetch the client configuration from the job. |
| * |
| * @param job |
| * The job |
| * @return The client configuration for the job |
| * @since 1.7.0 |
| * @deprecated since 2.0.0, replaced by {@link #getClientInfo(JobConf)} |
| */ |
| @Deprecated |
| protected static org.apache.accumulo.core.client.ClientConfiguration getClientConfiguration( |
| JobConf job) { |
| return InputConfigurator.getClientConfiguration(CLASS, job); |
| } |
| |
| // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) |
| /** |
| * Check whether a configuration is fully configured to be used with an Accumulo |
| * {@link InputFormat}. |
| * |
| * @param job |
| * the Hadoop context for the configured job |
| * @throws java.io.IOException |
| * if the context is improperly configured |
| * @since 1.5.0 |
| */ |
| protected static void validateOptions(JobConf job) throws IOException { |
| AccumuloClient client = InputConfigurator.getClient(CLASS, job); |
| InputConfigurator.validatePermissions(CLASS, job, client); |
| } |
| |
| /** |
| * Fetches all {@link InputTableConfig}s that have been set on the given Hadoop job. |
| * |
| * @param job |
| * the Hadoop job instance to be configured |
| * @return the {@link InputTableConfig} objects set on the job |
| * @since 1.6.0 |
| */ |
| public static Map<String,InputTableConfig> getInputTableConfigs(JobConf job) { |
| return InputConfigurator.getInputTableConfigs(CLASS, job); |
| } |
| |
| /** |
| * Fetches a {@link InputTableConfig} that has been set on the configuration for a specific table. |
| * |
| * <p> |
| * null is returned in the event that the table doesn't exist. |
| * |
| * @param job |
| * the Hadoop job instance to be configured |
| * @param tableName |
| * the table name for which to grab the config object |
| * @return the {@link InputTableConfig} for the given table |
| * @since 1.6.0 |
| */ |
| public static InputTableConfig getInputTableConfig(JobConf job, String tableName) { |
| return InputConfigurator.getInputTableConfig(CLASS, job, tableName); |
| } |
| |
| /** |
| * An abstract base class to be used to create {@link org.apache.hadoop.mapred.RecordReader} |
| * instances that convert from Accumulo |
| * {@link org.apache.accumulo.core.data.Key}/{@link org.apache.accumulo.core.data.Value} pairs to |
| * the user's K/V types. |
| * |
| * Subclasses must implement {@link #next(Object, Object)} to update key and value, and also to |
| * update the following variables: |
| * <ul> |
| * <li>Key {@link #currentKey} (used for progress reporting)</li> |
| * <li>int {@link #numKeysRead} (used for progress reporting)</li> |
| * </ul> |
| */ |
| protected abstract static class AbstractRecordReader<K,V> implements RecordReader<K,V> { |
| protected long numKeysRead; |
| protected Iterator<Map.Entry<Key,Value>> scannerIterator; |
| protected RangeInputSplit split; |
| private org.apache.accumulo.core.client.mapreduce.RangeInputSplit baseSplit; |
| protected ScannerBase scannerBase; |
| |
| /** |
| * Extracts Iterators settings from the context to be used by RecordReader. |
| * |
| * @param job |
| * the Hadoop job configuration |
| * @param tableName |
| * the table name for which the scanner is configured |
| * @return List of iterator settings for given table |
| * @since 1.7.0 |
| */ |
| protected abstract List<IteratorSetting> jobIterators(JobConf job, String tableName); |
| |
| /** |
| * Configures the iterators on a scanner for the given table name. |
| * |
| * @param job |
| * the Hadoop job configuration |
| * @param scanner |
| * the scanner for which to configure the iterators |
| * @param tableName |
| * the table name for which the scanner is configured |
| * @since 1.7.0 |
| */ |
| private void setupIterators(JobConf job, ScannerBase scanner, String tableName, |
| org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { |
| List<IteratorSetting> iterators = null; |
| |
| if (null == split) { |
| iterators = jobIterators(job, tableName); |
| } else { |
| iterators = split.getIterators(); |
| if (null == iterators) { |
| iterators = jobIterators(job, tableName); |
| } |
| } |
| |
| for (IteratorSetting iterator : iterators) |
| scanner.addScanIterator(iterator); |
| } |
| |
| /** |
| * Initialize a scanner over the given input split using this task attempt configuration. |
| */ |
| public void initialize(InputSplit inSplit, JobConf job) throws IOException { |
| baseSplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) inSplit; |
| log.debug("Initializing input split: " + baseSplit); |
| |
| ClientContext context = new ClientContext(getClientInfo(job)); |
| AccumuloClient client; |
| try { |
| client = context.getClient(); |
| } catch (AccumuloException | AccumuloSecurityException e) { |
| throw new IllegalStateException(e); |
| } |
| Authorizations authorizations = getScanAuthorizations(job); |
| String classLoaderContext = getClassLoaderContext(job); |
| String table = baseSplit.getTableName(); |
| |
| // in case the table name changed, we can still use the previous name for terms of |
| // configuration, but the scanner will use the table id resolved at job setup time |
| InputTableConfig tableConfig = getInputTableConfig(job, baseSplit.getTableName()); |
| |
| log.debug("Created client with user: " + client.whoami()); |
| log.debug("Creating scanner for table: " + table); |
| log.debug("Authorizations are: " + authorizations); |
| |
| if (baseSplit instanceof BatchInputSplit) { |
| BatchScanner scanner; |
| BatchInputSplit multiRangeSplit = (BatchInputSplit) baseSplit; |
| |
| try { |
| // Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit |
| // will not span tablets |
| int scanThreads = 1; |
| scanner = client.createBatchScanner(baseSplit.getTableName(), authorizations, |
| scanThreads); |
| setupIterators(job, scanner, baseSplit.getTableName(), baseSplit); |
| if (null != classLoaderContext) { |
| scanner.setClassLoaderContext(classLoaderContext); |
| } |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| |
| scanner.setRanges(multiRangeSplit.getRanges()); |
| scannerBase = scanner; |
| |
| } else if (baseSplit instanceof RangeInputSplit) { |
| split = (RangeInputSplit) baseSplit; |
| Boolean isOffline = baseSplit.isOffline(); |
| if (null == isOffline) { |
| isOffline = tableConfig.isOfflineScan(); |
| } |
| |
| Boolean isIsolated = baseSplit.isIsolatedScan(); |
| if (null == isIsolated) { |
| isIsolated = tableConfig.shouldUseIsolatedScanners(); |
| } |
| |
| Boolean usesLocalIterators = baseSplit.usesLocalIterators(); |
| if (null == usesLocalIterators) { |
| usesLocalIterators = tableConfig.shouldUseLocalIterators(); |
| } |
| |
| Scanner scanner; |
| |
| try { |
| if (isOffline) { |
| scanner = new OfflineScanner(context, Table.ID.of(baseSplit.getTableId()), |
| authorizations); |
| } else { |
| scanner = new ScannerImpl(context, Table.ID.of(baseSplit.getTableId()), authorizations); |
| } |
| if (isIsolated) { |
| log.info("Creating isolated scanner"); |
| scanner = new IsolatedScanner(scanner); |
| } |
| if (usesLocalIterators) { |
| log.info("Using local iterators"); |
| scanner = new ClientSideIteratorScanner(scanner); |
| } |
| setupIterators(job, scanner, baseSplit.getTableName(), baseSplit); |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| |
| scanner.setRange(baseSplit.getRange()); |
| scannerBase = scanner; |
| } else { |
| throw new IllegalArgumentException("Can not initialize from " + baseSplit.getClass()); |
| } |
| |
| Collection<Pair<Text,Text>> columns = baseSplit.getFetchedColumns(); |
| if (null == columns) { |
| columns = tableConfig.getFetchedColumns(); |
| } |
| |
| // setup a scanner within the bounds of this split |
| for (Pair<Text,Text> c : columns) { |
| if (c.getSecond() != null) { |
| log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond()); |
| scannerBase.fetchColumn(c.getFirst(), c.getSecond()); |
| } else { |
| log.debug("Fetching column family " + c.getFirst()); |
| scannerBase.fetchColumnFamily(c.getFirst()); |
| } |
| } |
| |
| SamplerConfiguration samplerConfig = baseSplit.getSamplerConfiguration(); |
| if (null == samplerConfig) { |
| samplerConfig = tableConfig.getSamplerConfiguration(); |
| } |
| |
| if (samplerConfig != null) { |
| scannerBase.setSamplerConfiguration(samplerConfig); |
| } |
| |
| Map<String,String> executionHints = baseSplit.getExecutionHints(); |
| if (executionHints == null || executionHints.size() == 0) { |
| executionHints = tableConfig.getExecutionHints(); |
| } |
| |
| if (executionHints != null) { |
| scannerBase.setExecutionHints(executionHints); |
| } |
| |
| scannerIterator = scannerBase.iterator(); |
| numKeysRead = 0; |
| } |
| |
| @Override |
| public void close() { |
| if (null != scannerBase) { |
| scannerBase.close(); |
| } |
| } |
| |
| @Override |
| public long getPos() throws IOException { |
| return numKeysRead; |
| } |
| |
| @Override |
| public float getProgress() throws IOException { |
| if (numKeysRead > 0 && currentKey == null) |
| return 1.0f; |
| return baseSplit.getProgress(currentKey); |
| } |
| |
| protected Key currentKey = null; |
| |
| } |
| |
| Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, Table.ID tableId, |
| List<Range> ranges) |
| throws TableNotFoundException, AccumuloException, AccumuloSecurityException { |
| ClientContext context = new ClientContext(getClientInfo(job)); |
| return InputConfigurator.binOffline(tableId, ranges, context); |
| } |
| |
| /** |
| * Gets the splits of the tables that have been set on the job by reading the metadata table for |
| * the specified ranges. |
| * |
| * @return the splits from the tables based on the ranges. |
| * @throws java.io.IOException |
| * if a table set on the job doesn't exist or an error occurs initializing the tablet |
| * locator |
| */ |
| @Override |
| public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { |
| Level logLevel = getLogLevel(job); |
| log.setLevel(logLevel); |
| validateOptions(job); |
| |
| Random random = new SecureRandom(); |
| LinkedList<InputSplit> splits = new LinkedList<>(); |
| Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(job); |
| for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) { |
| String tableName = tableConfigEntry.getKey(); |
| InputTableConfig tableConfig = tableConfigEntry.getValue(); |
| |
| ClientContext context = new ClientContext(getClientInfo(job)); |
| Table.ID tableId; |
| // resolve table name to id once, and use id from this point forward |
| try { |
| tableId = Tables.getTableId(context, tableName); |
| } catch (TableNotFoundException e) { |
| throw new IOException(e); |
| } |
| |
| boolean batchScan = InputConfigurator.isBatchScan(CLASS, job); |
| boolean supportBatchScan = !(tableConfig.isOfflineScan() |
| || tableConfig.shouldUseIsolatedScanners() || tableConfig.shouldUseLocalIterators()); |
| if (batchScan && !supportBatchScan) |
| throw new IllegalArgumentException("BatchScanner optimization not available for offline" |
| + " scan, isolated, or local iterators"); |
| |
| boolean autoAdjust = tableConfig.shouldAutoAdjustRanges(); |
| if (batchScan && !autoAdjust) |
| throw new IllegalArgumentException( |
| "AutoAdjustRanges must be enabled when using BatchScanner optimization"); |
| |
| List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) |
| : tableConfig.getRanges(); |
| if (ranges.isEmpty()) { |
| ranges = new ArrayList<>(1); |
| ranges.add(new Range()); |
| } |
| |
| // get the metadata information for these ranges |
| Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); |
| TabletLocator tl; |
| try { |
| if (tableConfig.isOfflineScan()) { |
| binnedRanges = binOfflineTable(job, tableId, ranges); |
| while (binnedRanges == null) { |
| // Some tablets were still online, try again |
| // sleep randomly between 100 and 200 ms |
| sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS); |
| binnedRanges = binOfflineTable(job, tableId, ranges); |
| } |
| } else { |
| tl = InputConfigurator.getTabletLocator(CLASS, job, tableId); |
| // its possible that the cache could contain complete, but old information about a tables |
| // tablets... so clear it |
| tl.invalidateCache(); |
| |
| while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) { |
| String tableIdStr = tableId.canonicalID(); |
| if (!Tables.exists(context, tableId)) |
| throw new TableDeletedException(tableIdStr); |
| if (Tables.getTableState(context, tableId) == TableState.OFFLINE) |
| throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); |
| binnedRanges.clear(); |
| log.warn("Unable to locate bins for specified ranges. Retrying."); |
| // sleep randomly between 100 and 200 ms |
| sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS); |
| tl.invalidateCache(); |
| } |
| } |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| |
| HashMap<Range,ArrayList<String>> splitsToAdd = null; |
| |
| if (!autoAdjust) |
| splitsToAdd = new HashMap<>(); |
| |
| HashMap<String,String> hostNameCache = new HashMap<>(); |
| for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) { |
| String ip = tserverBin.getKey().split(":", 2)[0]; |
| String location = hostNameCache.get(ip); |
| if (location == null) { |
| InetAddress inetAddress = InetAddress.getByName(ip); |
| location = inetAddress.getCanonicalHostName(); |
| hostNameCache.put(ip, location); |
| } |
| for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) { |
| Range ke = extentRanges.getKey().toDataRange(); |
| if (batchScan) { |
| // group ranges by tablet to be read by a BatchScanner |
| ArrayList<Range> clippedRanges = new ArrayList<>(); |
| for (Range r : extentRanges.getValue()) |
| clippedRanges.add(ke.clip(r)); |
| |
| BatchInputSplit split = new BatchInputSplit(tableName, tableId, clippedRanges, |
| new String[] {location}); |
| SplitUtils.updateSplit(split, tableConfig, logLevel); |
| |
| splits.add(split); |
| } else { |
| // not grouping by tablet |
| for (Range r : extentRanges.getValue()) { |
| if (autoAdjust) { |
| // divide ranges into smaller ranges, based on the tablets |
| RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(), |
| ke.clip(r), new String[] {location}); |
| SplitUtils.updateSplit(split, tableConfig, logLevel); |
| split.setOffline(tableConfig.isOfflineScan()); |
| split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); |
| split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); |
| |
| splits.add(split); |
| } else { |
| // don't divide ranges |
| ArrayList<String> locations = splitsToAdd.get(r); |
| if (locations == null) |
| locations = new ArrayList<>(1); |
| locations.add(location); |
| splitsToAdd.put(r, locations); |
| } |
| } |
| } |
| } |
| } |
| |
| if (!autoAdjust) |
| for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) { |
| RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonicalID(), |
| entry.getKey(), entry.getValue().toArray(new String[0])); |
| SplitUtils.updateSplit(split, tableConfig, logLevel); |
| split.setOffline(tableConfig.isOfflineScan()); |
| split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); |
| split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); |
| |
| splits.add(split); |
| } |
| } |
| |
| return splits.toArray(new InputSplit[splits.size()]); |
| } |
| } |