| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.hadoop; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.auth.PasswordAuthenticator; |
| import org.apache.cassandra.thrift.*; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.Reporter; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.thrift.protocol.TBinaryProtocol; |
| import org.apache.thrift.protocol.TProtocol; |
| import org.apache.thrift.transport.TTransport; |
| import org.apache.thrift.transport.TTransportException; |
| |
| /** |
| * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily. |
| * |
| * At minimum, you need to set the CF and predicate (description of columns to extract from each row) |
| * in your Hadoop job Configuration. The ConfigHelper class is provided to make this |
| * simple: |
| * ConfigHelper.setInputColumnFamily |
| * ConfigHelper.setInputSlicePredicate |
| * |
| * You can also configure the number of rows per InputSplit with |
| * ConfigHelper.setInputSplitSize |
| * This should be "as big as possible, but no bigger." Each InputSplit is read from Cassandra |
| * with multiple get_slice_range queries, and the per-call overhead of get_slice_range is high, |
| * so larger split sizes are better -- but if it is too large, you will run out of memory. |
| * |
| * The default split size is 64k rows. |
| */ |
| @Deprecated |
| public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> |
| { |
| private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class); |
| |
| @SuppressWarnings("resource") |
| public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception |
| { |
| logger.trace("Creating authenticated client for CF input format"); |
| TTransport transport; |
| try |
| { |
| transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port); |
| } |
| catch (Exception e) |
| { |
| throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e); |
| } |
| TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); |
| Cassandra.Client client = new Cassandra.Client(binaryProtocol); |
| |
| // log in |
| client.set_keyspace(ConfigHelper.getInputKeyspace(conf)); |
| if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null)) |
| { |
| Map<String, String> creds = new HashMap<String, String>(); |
| creds.put(PasswordAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf)); |
| creds.put(PasswordAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf)); |
| AuthenticationRequest authRequest = new AuthenticationRequest(creds); |
| client.login(authRequest); |
| } |
| logger.trace("Authenticated client for CF input format created successfully"); |
| return client; |
| } |
| |
| public RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException |
| { |
| return new ColumnFamilyRecordReader(); |
| } |
| |
| public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException |
| { |
| TaskAttemptContext tac = HadoopCompat.newMapContext( |
| jobConf, |
| TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)), |
| null, |
| null, |
| null, |
| new ReporterWrapper(reporter), |
| null); |
| |
| ColumnFamilyRecordReader recordReader = new ColumnFamilyRecordReader(jobConf.getInt(CASSANDRA_HADOOP_MAX_KEY_SIZE, CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT)); |
| recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac); |
| return recordReader; |
| } |
| |
| @Override |
| protected void validateConfiguration(Configuration conf) |
| { |
| super.validateConfiguration(conf); |
| |
| if (ConfigHelper.getInputSlicePredicate(conf) == null) |
| { |
| throw new UnsupportedOperationException("you must set the predicate with setInputSlicePredicate"); |
| } |
| } |
| |
| } |