| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.hadoop.cql3; |
| |
| import java.io.IOException; |
| |
| import org.apache.cassandra.hadoop.HadoopCompat; |
| import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat; |
| import org.apache.cassandra.hadoop.ReporterWrapper; |
| import org.apache.hadoop.mapred.InputSplit; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.RecordReader; |
| import org.apache.hadoop.mapred.Reporter; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| |
| import com.datastax.driver.core.Row; |
| |
| /** |
| * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily. |
| * |
| * At minimum, you need to set the KS and CF in your Hadoop job Configuration. |
| * The ConfigHelper class is provided to make this |
| * simple: |
| * ConfigHelper.setInputColumnFamily |
| * |
| * You can also configure the number of rows per InputSplit with |
| * 1: ConfigHelper.setInputSplitSize. The default split size is 64k rows. |
| * or |
| * 2: ConfigHelper.setInputSplitSizeInMb. InputSplit size in MB with new, more precise method |
| * If no value is provided for InputSplitSizeInMb, InputSplitSize will be used. |
| * |
| * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You |
| * should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL |
| * query, so you need set it big enough to minimize the network overhead, and also |
| * not too big to avoid out of memory issue. |
| * |
| * other native protocol connection parameters in CqlConfigHelper |
| */ |
| public class CqlInputFormat extends AbstractColumnFamilyInputFormat<Long, Row> |
| { |
| public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter) |
| throws IOException |
| { |
| TaskAttemptContext tac = HadoopCompat.newMapContext( |
| jobConf, |
| TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)), |
| null, |
| null, |
| null, |
| new ReporterWrapper(reporter), |
| null); |
| |
| |
| CqlRecordReader recordReader = new CqlRecordReader(); |
| recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac); |
| return recordReader; |
| } |
| |
| @Override |
| public org.apache.hadoop.mapreduce.RecordReader<Long, Row> createRecordReader( |
| org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException, |
| InterruptedException |
| { |
| return new CqlRecordReader(); |
| } |
| |
| } |