blob: 07556846b9f352c9c939c91279b7fb982fee5b9d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.hadoop.cql3;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.*;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TokenRange;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.utils.*;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
/**
* Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
*
* At minimum, you need to set the KS and CF in your Hadoop job Configuration.
* The ConfigHelper class is provided to make this
* simple:
* ConfigHelper.setInputColumnFamily
*
* You can also configure the number of rows per InputSplit with
* 1: ConfigHelper.setInputSplitSize. The default split size is 64k rows.
* or
* 2: ConfigHelper.setInputSplitSizeInMb. InputSplit size in MB with new, more precise method
* If no value is provided for InputSplitSizeInMb, we default to using InputSplitSize.
*
* CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You
* should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL
* query, so you need set it big enough to minimize the network overhead, and also
* not too big to avoid out of memory issue.
*
* other native protocol connection parameters in CqlConfigHelper
*/
public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long, Row> implements org.apache.hadoop.mapred.InputFormat<Long, Row>
{
public static final String MAPRED_TASK_ID = "mapred.task.id";
private static final Logger logger = LoggerFactory.getLogger(CqlInputFormat.class);
private String keyspace;
private String cfName;
private IPartitioner partitioner;
public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
throws IOException
{
TaskAttemptContext tac = HadoopCompat.newMapContext(
jobConf,
TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)),
null,
null,
null,
new ReporterWrapper(reporter),
null);
CqlRecordReader recordReader = new CqlRecordReader();
recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
return recordReader;
}
@Override
public org.apache.hadoop.mapreduce.RecordReader<Long, Row> createRecordReader(
org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException
{
return new CqlRecordReader();
}
protected void validateConfiguration(Configuration conf)
{
if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null)
{
throw new UnsupportedOperationException("you must set the keyspace and table with setInputColumnFamily()");
}
if (ConfigHelper.getInputInitialAddress(conf) == null)
throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node with setInputInitialAddress");
if (ConfigHelper.getInputPartitioner(conf) == null)
throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner");
}
public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(JobContext context) throws IOException
{
Configuration conf = HadoopCompat.getConfiguration(context);
validateConfiguration(conf);
keyspace = ConfigHelper.getInputKeyspace(conf);
cfName = ConfigHelper.getInputColumnFamily(conf);
partitioner = ConfigHelper.getInputPartitioner(conf);
logger.trace("partitioner is {}", partitioner);
// canonical ranges, split into pieces, fetching the splits in parallel
ExecutorService executor = executorFactory().pooled("HadoopInput", 128);
List<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
String[] inputInitialAddress = ConfigHelper.getInputInitialAddress(conf).split(",");
try (Cluster cluster = CqlConfigHelper.getInputCluster(inputInitialAddress, conf);
Session session = cluster.connect())
{
List<SplitFuture> splitfutures = new ArrayList<>();
//TODO if the job range is defined and does perfectly match tokens, then the logic will be unable to get estimates since they are pre-computed
// tokens: [0, 10, 20]
// job range: [0, 10) - able to get estimate
// job range: [5, 15) - unable to get estimate
Pair<String, String> jobKeyRange = ConfigHelper.getInputKeyRange(conf);
Range<Token> jobRange = null;
if (jobKeyRange != null)
{
jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.left),
partitioner.getTokenFactory().fromString(jobKeyRange.right));
}
Metadata metadata = cluster.getMetadata();
// canonical ranges and nodes holding replicas
Map<TokenRange, List<Host>> masterRangeNodes = getRangeMap(keyspace, metadata, getTargetDC(metadata, inputInitialAddress));
for (TokenRange range : masterRangeNodes.keySet())
{
if (jobRange == null)
{
for (TokenRange unwrapped : range.unwrap())
{
// for each tokenRange, pick a live owner and ask it for the byte-sized splits
SplitFuture task = new SplitFuture(new SplitCallable(unwrapped, masterRangeNodes.get(range), conf, session));
executor.submit(task);
splitfutures.add(task);
}
}
else
{
TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange);
if (range.intersects(jobTokenRange))
{
for (TokenRange intersection: range.intersectWith(jobTokenRange))
{
for (TokenRange unwrapped : intersection.unwrap())
{
// for each tokenRange, pick a live owner and ask it for the byte-sized splits
SplitFuture task = new SplitFuture(new SplitCallable(unwrapped, masterRangeNodes.get(range), conf, session));
executor.submit(task);
splitfutures.add(task);
}
}
}
}
}
// wait until we have all the results back
List<SplitFuture> failedTasks = new ArrayList<>();
int maxSplits = 0;
long expectedPartionsForFailedRanges = 0;
for (SplitFuture task : splitfutures)
{
try
{
List<ColumnFamilySplit> tokenRangeSplits = task.get();
if (tokenRangeSplits.size() > maxSplits)
{
maxSplits = tokenRangeSplits.size();
expectedPartionsForFailedRanges = tokenRangeSplits.get(0).getLength();
}
splits.addAll(tokenRangeSplits);
}
catch (Exception e)
{
failedTasks.add(task);
}
}
// The estimate is only stored on a single host, if that host is down then can not get the estimate
// its more than likely that a single host could be "too large" for one split but there is no way of
// knowning!
// This logic attempts to guess the estimate from all the successful ranges
if (!failedTasks.isEmpty())
{
// if every split failed this will be 0
if (maxSplits == 0)
throwAllSplitsFailed(failedTasks);
for (SplitFuture task : failedTasks)
{
try
{
// the task failed, so this should throw
task.get();
}
catch (Exception cause)
{
logger.warn("Unable to get estimate for {}, the host {} had a exception; falling back to default estimate", task.splitCallable.tokenRange, task.splitCallable.hosts.get(0), cause);
}
}
for (SplitFuture task : failedTasks)
splits.addAll(toSplit(task.splitCallable.hosts, splitTokenRange(task.splitCallable.tokenRange, maxSplits, expectedPartionsForFailedRanges)));
}
}
finally
{
executor.shutdownNow();
}
assert splits.size() > 0;
Collections.shuffle(splits, new Random(nanoTime()));
return splits;
}
private static IllegalStateException throwAllSplitsFailed(List<SplitFuture> failedTasks)
{
IllegalStateException exception = new IllegalStateException("No successful tasks found");
for (SplitFuture task : failedTasks)
{
try
{
// the task failed, so this should throw
task.get();
}
catch (Exception cause)
{
exception.addSuppressed(cause);
}
}
throw exception;
}
private static String getTargetDC(Metadata metadata, String[] inputInitialAddress)
{
BiMultiValMap<InetAddress, String> addressToDc = new BiMultiValMap<>();
Multimap<String, InetAddress> dcToAddresses = addressToDc.inverse();
// only way to match is off the broadcast addresses, so for all hosts do a existence check
Set<InetAddress> addresses = new HashSet<>(inputInitialAddress.length);
for (String inputAddress : inputInitialAddress)
addresses.addAll(parseAddress(inputAddress));
for (Host host : metadata.getAllHosts())
{
InetAddress address = host.getBroadcastAddress();
if (addresses.contains(address))
addressToDc.put(address, host.getDatacenter());
}
switch (dcToAddresses.keySet().size())
{
case 1:
return Iterables.getOnlyElement(dcToAddresses.keySet());
case 0:
throw new IllegalStateException("Input addresses could not be used to find DC; non match client metadata");
default:
// Mutliple DCs found, attempt to pick the first based off address list. This is to mimic the 2.1
// behavior which would connect in order and the first node successfully able to connect to was the
// local DC to use; since client abstracts this, we rely on existence as a proxy for connect.
for (String inputAddress : inputInitialAddress)
{
for (InetAddress add : parseAddress(inputAddress))
{
String dc = addressToDc.get(add);
// possible the address isn't in the cluster and the client dropped, so ignore null
if (dc != null)
return dc;
}
}
// some how we were able to connect to the cluster, find multiple DCs using matching, and yet couldn't
// match again...
throw new AssertionError("Unable to infer datacenter from initial addresses; multiple datacenters found "
+ dcToAddresses.keySet() + ", should only use addresses from one datacenter");
}
}
private static List<InetAddress> parseAddress(String str)
{
try
{
return Arrays.asList(InetAddress.getAllByName(str));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range)
{
return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)),
metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
}
private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Host host, Configuration conf, Session session)
{
int splitSize = ConfigHelper.getInputSplitSize(conf);
int splitSizeMiB = ConfigHelper.getInputSplitSizeInMb(conf);
return describeSplits(keyspace, cfName, range, host, splitSize, splitSizeMiB, session);
}
private static Map<TokenRange, List<Host>> getRangeMap(String keyspace, Metadata metadata, String targetDC)
{
return CqlClientHelper.getLocalPrimaryRangeForDC(keyspace, metadata, targetDC);
}
private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, Host host, int splitSize, int splitSizeMb, Session session)
{
// In 2.1 the host list was walked in-order (only move to next if IOException) and calls
// org.apache.cassandra.service.StorageService.getSplits(java.lang.String, java.lang.String, org.apache.cassandra.dht.Range<org.apache.cassandra.dht.Token>, int)
// that call computes totalRowCountEstimate (used to compute #splits) then splits the ring based off those estimates
//
// The main difference is that the estimates in 2.1 were computed based off the data, so replicas could answer the estimates
// In 3.0 we rely on the below CQL query which is local and only computes estimates for the primary range; this
// puts us in a sticky spot to answer, if the node fails what do we do? 3.0 behavior only matches 2.1 IFF all
// nodes are up and healthy
ResultSet resultSet = queryTableEstimates(session, host, keyspace, table, tokenRange);
Row row = resultSet.one();
long meanPartitionSize = 0;
long partitionCount = 0;
int splitCount = 0;
if (row != null)
{
meanPartitionSize = row.getLong("mean_partition_size");
partitionCount = row.getLong("partitions_count");
splitCount = splitSizeMb > 0
? (int)(meanPartitionSize * partitionCount / splitSizeMb / 1024 / 1024)
: (int)(partitionCount / splitSize);
}
// If we have no data on this split or the size estimate is 0,
// return the full split i.e., do not sub-split
// Assume smallest granularity of partition count available from CASSANDRA-7688
if (splitCount == 0)
{
Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
wrappedTokenRange.put(tokenRange, partitionCount == 0 ? 128L : partitionCount);
return wrappedTokenRange;
}
return splitTokenRange(tokenRange, splitCount, partitionCount / splitCount);
}
private static ResultSet queryTableEstimates(Session session, Host host, String keyspace, String table, TokenRange tokenRange)
{
try
{
String query = String.format("SELECT mean_partition_size, partitions_count " +
"FROM %s.%s " +
"WHERE keyspace_name = ? AND table_name = ? AND range_type = '%s' AND range_start = ? AND range_end = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.TABLE_ESTIMATES,
SystemKeyspace.TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY);
Statement stmt = new SimpleStatement(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()).setHost(host);
return session.execute(stmt);
}
catch (InvalidQueryException e)
{
// if the table doesn't exist, fall back to old table. This is likely to return no records in a multi
// DC setup, but should work fine in a single DC setup.
String query = String.format("SELECT mean_partition_size, partitions_count " +
"FROM %s.%s " +
"WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_SIZE_ESTIMATES);
Statement stmt = new SimpleStatement(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()).setHost(host);
return session.execute(stmt);
}
}
private static Map<TokenRange, Long> splitTokenRange(TokenRange tokenRange, int splitCount, long partitionCount)
{
List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
Map<TokenRange, Long> rangesWithLength = Maps.newHashMapWithExpectedSize(splitRanges.size());
for (TokenRange range : splitRanges)
rangesWithLength.put(range, partitionCount);
return rangesWithLength;
}
// Old Hadoop API
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
{
TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID());
List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac);
InputSplit[] oldInputSplits = new InputSplit[newInputSplits.size()];
for (int i = 0; i < newInputSplits.size(); i++)
oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i);
return oldInputSplits;
}
/**
* Gets a token tokenRange and splits it up according to the suggested
* size into input splits that Hadoop can use.
*/
class SplitCallable implements Callable<List<ColumnFamilySplit>>
{
private final TokenRange tokenRange;
private final List<Host> hosts;
private final Configuration conf;
private final Session session;
public SplitCallable(TokenRange tokenRange, List<Host> hosts, Configuration conf, Session session)
{
Preconditions.checkArgument(!hosts.isEmpty(), "hosts list requires at least 1 host but was empty");
this.tokenRange = tokenRange;
this.hosts = hosts;
this.conf = conf;
this.session = session;
}
public List<ColumnFamilySplit> call() throws Exception
{
Map<TokenRange, Long> subSplits = getSubSplits(keyspace, cfName, tokenRange, hosts.get(0), conf, session);
return toSplit(hosts, subSplits);
}
}
private static class SplitFuture extends FutureTask<List<ColumnFamilySplit>>
{
private final SplitCallable splitCallable;
SplitFuture(SplitCallable splitCallable)
{
super(splitCallable);
this.splitCallable = splitCallable;
}
}
private List<ColumnFamilySplit> toSplit(List<Host> hosts, Map<TokenRange, Long> subSplits)
{
// turn the sub-ranges into InputSplits
String[] endpoints = new String[hosts.size()];
// hadoop needs hostname, not ip
int endpointIndex = 0;
for (Host endpoint : hosts)
endpoints[endpointIndex++] = endpoint.getAddress().getHostName();
boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner;
ArrayList<ColumnFamilySplit> splits = new ArrayList<>();
for (Map.Entry<TokenRange, Long> subSplitEntry : subSplits.entrySet())
{
TokenRange subrange = subSplitEntry.getKey();
ColumnFamilySplit split =
new ColumnFamilySplit(
partitionerIsOpp ?
subrange.getStart().toString().substring(2) : subrange.getStart().toString(),
partitionerIsOpp ?
subrange.getEnd().toString().substring(2) : subrange.getEnd().toString(),
subSplitEntry.getValue(),
endpoints);
logger.trace("adding {}", split);
splits.add(split);
}
return splits;
}
}