| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.hadoop; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.datastax.driver.core.Cluster; |
| import com.datastax.driver.core.Host; |
| import com.datastax.driver.core.Metadata; |
| import com.datastax.driver.core.ResultSet; |
| import com.datastax.driver.core.Row; |
| import com.datastax.driver.core.Session; |
| import com.datastax.driver.core.TokenRange; |
| import org.apache.cassandra.db.SystemKeyspace; |
| import org.apache.cassandra.dht.IPartitioner; |
| import org.apache.cassandra.dht.Range; |
| import org.apache.cassandra.dht.Token; |
| import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; |
| import org.apache.cassandra.thrift.KeyRange; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapreduce.InputFormat; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| |
| public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y> |
| { |
| private static final Logger logger = LoggerFactory.getLogger(AbstractColumnFamilyInputFormat.class); |
| |
| public static final String MAPRED_TASK_ID = "mapred.task.id"; |
| // The simple fact that we need this is because the old Hadoop API wants us to "write" |
| // to the key and value whereas the new asks for it. |
| // I choose 8kb as the default max key size (instantiated only once), but you can |
| // override it in your jobConf with this setting. |
| public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size"; |
| public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192; |
| |
| private String keyspace; |
| private String cfName; |
| private IPartitioner partitioner; |
| |
| protected void validateConfiguration(Configuration conf) |
| { |
| if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null) |
| { |
| throw new UnsupportedOperationException("you must set the keyspace and table with setInputColumnFamily()"); |
| } |
| if (ConfigHelper.getInputInitialAddress(conf) == null) |
| throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node with setInputInitialAddress"); |
| if (ConfigHelper.getInputPartitioner(conf) == null) |
| throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner"); |
| } |
| |
| public List<InputSplit> getSplits(JobContext context) throws IOException |
| { |
| Configuration conf = HadoopCompat.getConfiguration(context); |
| |
| validateConfiguration(conf); |
| |
| keyspace = ConfigHelper.getInputKeyspace(conf); |
| cfName = ConfigHelper.getInputColumnFamily(conf); |
| partitioner = ConfigHelper.getInputPartitioner(conf); |
| logger.trace("partitioner is {}", partitioner); |
| |
| // canonical ranges and nodes holding replicas |
| Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace); |
| |
| // canonical ranges, split into pieces, fetching the splits in parallel |
| ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); |
| List<InputSplit> splits = new ArrayList<>(); |
| |
| List<Future<List<InputSplit>>> splitfutures = new ArrayList<>(); |
| KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf); |
| Range<Token> jobRange = null; |
| if (jobKeyRange != null) |
| { |
| if (jobKeyRange.start_key != null) |
| { |
| if (!partitioner.preservesOrder()) |
| throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner"); |
| if (jobKeyRange.start_token != null) |
| throw new IllegalArgumentException("only start_key supported"); |
| if (jobKeyRange.end_token != null) |
| throw new IllegalArgumentException("only start_key supported"); |
| jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key), |
| partitioner.getToken(jobKeyRange.end_key)); |
| } |
| else if (jobKeyRange.start_token != null) |
| { |
| jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token), |
| partitioner.getTokenFactory().fromString(jobKeyRange.end_token)); |
| } |
| else |
| { |
| logger.warn("ignoring jobKeyRange specified without start_key or start_token"); |
| } |
| } |
| |
| try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf); |
| Session session = cluster.connect()) |
| { |
| Metadata metadata = session.getCluster().getMetadata(); |
| |
| for (TokenRange range : masterRangeNodes.keySet()) |
| { |
| if (jobRange == null) |
| { |
| // for each tokenRange, pick a live owner and ask it to compute bite-sized splits |
| splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf, session))); |
| } |
| else |
| { |
| TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange); |
| if (range.intersects(jobTokenRange)) |
| { |
| for (TokenRange intersection: range.intersectWith(jobTokenRange)) |
| { |
| // for each tokenRange, pick a live owner and ask it to compute bite-sized splits |
| splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf, session))); |
| } |
| } |
| } |
| } |
| |
| // wait until we have all the results back |
| for (Future<List<InputSplit>> futureInputSplits : splitfutures) |
| { |
| try |
| { |
| splits.addAll(futureInputSplits.get()); |
| } |
| catch (Exception e) |
| { |
| throw new IOException("Could not get input splits", e); |
| } |
| } |
| } |
| finally |
| { |
| executor.shutdownNow(); |
| } |
| |
| assert splits.size() > 0; |
| Collections.shuffle(splits, new Random(System.nanoTime())); |
| return splits; |
| } |
| |
| private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range) |
| { |
| return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)), |
| metadata.newToken(partitioner.getTokenFactory().toString(range.right))); |
| } |
| |
| /** |
| * Gets a token tokenRange and splits it up according to the suggested |
| * size into input splits that Hadoop can use. |
| */ |
| class SplitCallable implements Callable<List<InputSplit>> |
| { |
| |
| private final TokenRange tokenRange; |
| private final Set<Host> hosts; |
| private final Configuration conf; |
| private final Session session; |
| |
| public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf, Session session) |
| { |
| this.tokenRange = tr; |
| this.hosts = hosts; |
| this.conf = conf; |
| this.session = session; |
| } |
| |
| public List<InputSplit> call() throws Exception |
| { |
| ArrayList<InputSplit> splits = new ArrayList<>(); |
| Map<TokenRange, Long> subSplits; |
| subSplits = getSubSplits(keyspace, cfName, tokenRange, conf, session); |
| // turn the sub-ranges into InputSplits |
| String[] endpoints = new String[hosts.size()]; |
| |
| // hadoop needs hostname, not ip |
| int endpointIndex = 0; |
| for (Host endpoint : hosts) |
| endpoints[endpointIndex++] = endpoint.getAddress().getHostName(); |
| |
| for (TokenRange subSplit : subSplits.keySet()) |
| { |
| List<TokenRange> ranges = subSplit.unwrap(); |
| for (TokenRange subrange : ranges) |
| { |
| ColumnFamilySplit split = |
| new ColumnFamilySplit( |
| partitioner.preservesOrder() ? |
| subrange.getStart().toString().substring(2) : subrange.getStart().toString(), |
| partitioner.preservesOrder() ? |
| subrange.getEnd().toString().substring(2) : subrange.getEnd().toString(), |
| subSplits.get(subSplit), |
| endpoints); |
| |
| logger.trace("adding {}", split); |
| splits.add(split); |
| } |
| } |
| return splits; |
| } |
| } |
| |
| private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session) throws IOException |
| { |
| int splitSize = ConfigHelper.getInputSplitSize(conf); |
| int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf); |
| try |
| { |
| return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb, session); |
| } |
| catch (Exception e) |
| { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace) |
| { |
| try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf)) |
| { |
| Map<TokenRange, Set<Host>> map = new HashMap<>(); |
| Metadata metadata = cluster.connect().getCluster().getMetadata(); |
| for (TokenRange tokenRange : metadata.getTokenRanges()) |
| map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange)); |
| return map; |
| } |
| } |
| |
| private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb, Session session) |
| { |
| String query = String.format("SELECT mean_partition_size, partitions_count " + |
| "FROM %s.%s " + |
| "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?", |
| SystemKeyspace.NAME, |
| SystemKeyspace.SIZE_ESTIMATES); |
| |
| ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()); |
| |
| Row row = resultSet.one(); |
| // If we have no data on this split, return the full split i.e., do not sub-split |
| // Assume smallest granularity of partition count available from CASSANDRA-7688 |
| if (row == null) |
| { |
| Map<TokenRange, Long> wrappedTokenRange = new HashMap<>(); |
| wrappedTokenRange.put(tokenRange, (long) 128); |
| return wrappedTokenRange; |
| } |
| |
| long meanPartitionSize = row.getLong("mean_partition_size"); |
| long partitionCount = row.getLong("partitions_count"); |
| |
| int splitCount = splitSizeMb > 0 |
| ? (int)(meanPartitionSize * partitionCount / splitSizeMb / 1024 / 1024) |
| : (int)(partitionCount / splitSize); |
| |
| if (splitCount <= 0) splitCount = 1; |
| List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount); |
| Map<TokenRange, Long> rangesWithLength = new HashMap<>(); |
| for (TokenRange range : splitRanges) |
| rangesWithLength.put(range, partitionCount/splitCount); |
| |
| return rangesWithLength; |
| } |
| |
| // Old Hadoop API |
| public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException |
| { |
| TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID()); |
| List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac); |
| org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new org.apache.hadoop.mapred.InputSplit[newInputSplits.size()]; |
| for (int i = 0; i < newInputSplits.size(); i++) |
| oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i); |
| return oldInputSplits; |
| } |
| } |